java - Can RxJava reduce() be unsafe when parallelized? -
i want use reduce()
operation on observable map guava immutablelist
, since prefer more standard arraylist
.
observable<string> strings = ... observable<immutablelist<string>> captured = strings.reduce(immutablelist.<string>builder(), (b,s) -> b.add(s)) .map(immutablelist.builder::build); captured.foreach(i -> system.out.println(i));
simple enough. suppose somewhere scheduled observable strings
in parallel multiple threads or something. not derail reduce()
operation , possibly cause race condition? since immutablelist.builder
vulnerable that?
the problem lies in shared state between realizations of chain. pitfall # 8 in my blog:
shared state in observable chain
let's assume dissatisfied performance or type of list tolist() operator returns , want roll own aggregator instead of it. change, want using existing operators , find operator reduce():
observable<vector<integer>> list = observable .range(1, 3) .reduce(new vector<integer>(), (vector, value) -> { vector.add(value); return vector; }); list.subscribe(system.out::println); list.subscribe(system.out::println); list.subscribe(system.out::println);
when run 'test' calls, first prints you'd expect, second prints vector range 1-3 appears twice , third subscribe prints 9 elements!
the problem not reduce() operator expectation surrounding it. when chain established, new vector passed in 'global' instance , shared between evaluation of chain.
naturally, there way of fixing without implementing operator whole purpose (which should quite simple if see potential in previous counterop):
observable<vector<integer>> list2 = observable .range(1, 3) .reduce((vector<integer>)null, (vector, value) -> { if (vector == null) { vector = new vector<>(); } vector.add(value); return vector; }); list2.subscribe(system.out::println); list2.subscribe(system.out::println); list2.subscribe(system.out::println);
you need start null , create vector inside accumulator function, isn't shared between subscribers.
alternatively, can collect()
operator has factory callback initial value.
the rule of thumb here whenever see aggregator-like operator taking plain value, cautious 'initial value' shared across subscribers , if plan consume resulting stream multiple subscribers, clash , may give unexpected results or crash.
Comments
Post a Comment