java - Can RxJava reduce() be unsafe when parallelized? -
i want use reduce() operation on observable map guava immutablelist, since prefer more standard arraylist.
observable<string> strings = ... observable<immutablelist<string>> captured = strings.reduce(immutablelist.<string>builder(), (b,s) -> b.add(s)) .map(immutablelist.builder::build); captured.foreach(i -> system.out.println(i)); simple enough. suppose somewhere scheduled observable strings in parallel multiple threads or something. not derail reduce() operation , possibly cause race condition? since immutablelist.builder vulnerable that?
the problem lies in shared state between realizations of chain. pitfall # 8 in my blog:
shared state in observable chain
let's assume dissatisfied performance or type of list tolist() operator returns , want roll own aggregator instead of it. change, want using existing operators , find operator reduce():
observable<vector<integer>> list = observable .range(1, 3) .reduce(new vector<integer>(), (vector, value) -> { vector.add(value); return vector; }); list.subscribe(system.out::println); list.subscribe(system.out::println); list.subscribe(system.out::println); when run 'test' calls, first prints you'd expect, second prints vector range 1-3 appears twice , third subscribe prints 9 elements!
the problem not reduce() operator expectation surrounding it. when chain established, new vector passed in 'global' instance , shared between evaluation of chain.
naturally, there way of fixing without implementing operator whole purpose (which should quite simple if see potential in previous counterop):
observable<vector<integer>> list2 = observable .range(1, 3) .reduce((vector<integer>)null, (vector, value) -> { if (vector == null) { vector = new vector<>(); } vector.add(value); return vector; }); list2.subscribe(system.out::println); list2.subscribe(system.out::println); list2.subscribe(system.out::println); you need start null , create vector inside accumulator function, isn't shared between subscribers.
alternatively, can collect() operator has factory callback initial value.
the rule of thumb here whenever see aggregator-like operator taking plain value, cautious 'initial value' shared across subscribers , if plan consume resulting stream multiple subscribers, clash , may give unexpected results or crash.
Comments
Post a Comment