java - How to merge two sorted Observables into a single sorted Observable? -
given:
integer[] arr1 = {1, 5, 9, 17}; integer[] arr2 = {1, 2, 3, 6, 7, 12, 15}; observable<integer> o1 = observable.from(arr1); observable<integer> o2 = observable.from(arr2); how observable contains 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?
edit: please see the_joric's comment if you're going use this. there edge case isn't handled, don't see quick way fix it, , don't have time fix right now.
here's solution in c#, since have system.reactive tag.
static iobservable<int> mergesorted(iobservable<int> a, iobservable<int> b) { var source = observable.merge( a.select(x => tuple.create('a', x)), b.select(y => tuple.create('b', y))); return source.publish(o => { var published_a = o.where(t => t.item1 == 'a').select(t => t.item2); var published_b = o.where(t => t.item1 == 'b').select(t => t.item2); return observable.merge( published_a.delay(x => published_b.firstordefaultasync(y => x <= y)), published_b.delay(y => published_a.firstordefaultasync(x => y <= x))); }); } the idea summarized follows.
when
aemits valuex, delay untilbemits valueysuchx <= y.when
bemits valuey, delay untilaemits valuexsuchy <= x.
if had hot observables, following. following not work if there cold observables in mix. advise using version works both hot , cold observables.
static iobservable<int> mergesortedhot(iobservable<int> a, iobservable<int> b) { return observable.merge( a.delay(x => b.firstordefaultasync(y => x <= y)), b.delay(y => a.firstordefaultasync(x => y <= x))); }
Comments
Post a Comment