java - Create hot observable -


i taking first steps rx. have read bits thought getting hands dirty better way go. started transforming 1 existing codes rx type of code.

the goal: trying mock source sends out data specific frequency (say 60/s, video camera or whatever). have footage recorded simulate source while source not available. , need source start sending if no 1 listening because thats real source do.

before rx, went , made runnable iterates on 15.000 data items, sends item rabbitmq server , sleeps 1/60s , sends next one.

now want turn logic hot observable, playing around. far have this:

observable.from(mdataitems)                 .takewhile(item -> mrunning)                 .map(mgson::tojson)                 .doonnext(json -> {                     try {                         mchannel.basicpublish(exchange_name, "", null, json.getbytes());                     } catch (ioexception e) {                         logger.error(e, string.format("could not publish %s exchange", exchange_name));                     }                      try {                         thread.sleep(1 / sending_frequency_in_hz);                     } catch (interruptedexception e) {                         logger.error(e, string.format("could not sleep %d ms", (int) (1000 / sending_frequency_in_hz)));                     }                 })                 .dooncompleted(() -> {                     if (mrunning)                         logger.info("all data sent");                     else                         logger.info("interrupted while sending");                      disconnect();                     mrunning = false;                 })                 .subscribeon(schedulers.io())                 .publish()                 .connect();  

and though works far, dont know if "good" way create hot observable (or observable in general matter) emits items. (i dont know if should use subject instead of observable, thats question).

yes, there alternative:

int delay = 1000 / frequency; observable o = observable.from(dataitems) .zipwith(     observable.timer(delay, delay, timeunit.milliseconds)         .onbackpressuredrop(),     (s, t) -> s) .map(mgson::tojson) // other ops necessary .subscribeon(schedulers.io()) .publish();  o.connect();  o.subscribe(...); 

Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - Bypass Geo Redirect for specific directories -

php - .htaccess mod_rewrite for dynamic url which has domain names -