java - Can RxJava reduce() be unsafe when parallelized? -


i want use reduce() operation on observable map guava immutablelist, since prefer more standard arraylist.

observable<string> strings = ...  observable<immutablelist<string>> captured = strings.reduce(immutablelist.<string>builder(), (b,s) -> b.add(s))                 .map(immutablelist.builder::build);  captured.foreach(i -> system.out.println(i)); 

simple enough. suppose somewhere scheduled observable strings in parallel multiple threads or something. not derail reduce() operation , possibly cause race condition? since immutablelist.builder vulnerable that?

the problem lies in shared state between realizations of chain. pitfall # 8 in my blog:

shared state in observable chain

let's assume dissatisfied performance or type of list tolist() operator returns , want roll own aggregator instead of it. change, want using existing operators , find operator reduce():

observable<vector<integer>> list = observable     .range(1, 3)     .reduce(new vector<integer>(), (vector, value) -> {         vector.add(value);         return vector;     });  list.subscribe(system.out::println); list.subscribe(system.out::println); list.subscribe(system.out::println); 

when run 'test' calls, first prints you'd expect, second prints vector range 1-3 appears twice , third subscribe prints 9 elements!

the problem not reduce() operator expectation surrounding it. when chain established, new vector passed in 'global' instance , shared between evaluation of chain.

naturally, there way of fixing without implementing operator whole purpose (which should quite simple if see potential in previous counterop):

observable<vector<integer>> list2 = observable     .range(1, 3)     .reduce((vector<integer>)null, (vector, value) -> {         if (vector == null) {             vector = new vector<>();         }         vector.add(value);         return vector;     });  list2.subscribe(system.out::println); list2.subscribe(system.out::println); list2.subscribe(system.out::println); 

you need start null , create vector inside accumulator function, isn't shared between subscribers.

alternatively, can collect() operator has factory callback initial value.

the rule of thumb here whenever see aggregator-like operator taking plain value, cautious 'initial value' shared across subscribers , if plan consume resulting stream multiple subscribers, clash , may give unexpected results or crash.


Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - .htaccess mod_rewrite for dynamic url which has domain names -

Website Login Issue developed in magento -