scala - continuously fetch database results with scalaz.stream -
i'm new scala , extremely new scalaz. through different stackoverflow answer , handholding, able use scalaz.stream implement process continuously fetch twitter api results. i'd same thing cassandra db twitter handles stored.
the code fetching twitter results here:
def urls: seq[(handle,url)] = { await.result( getall(connection).map { list => list.map(twittertoget => (twittertoget.handle, urlboilerplate + twittertoget.handle + parameters + twittertoget.sinceid) ) }, 5 seconds) } val fetchurl = channel.lift[task, (handle, url), fetched] { url => task.delay { val finalresult = calltwitter(url) if (finalresult.tweets.nonempty) { connection.updatetwitter(finalresult) } else { println("\n" + finalresult.handle + " not have new tweets") } s"\ntwitter fetch & database update completed" } } val p = process val process = (time.awakeevery(3.second) zipwith p.emitall(urls))((b, url) => url). through(fetchurl) val fetched = process.runlog.run fetched.foreach(println)
what i'm planning use
def urls: seq[(handle,url)] = {
to continuously fetch cassandra results (with awakeevery) , send them off actor run above twitter fetching code.
my question is, best way implement scalaz.stream? note i'd database results, have delay before getting database results again. should use same architecture twitter fetching code above? if so, how create channel.lift doesn't require input? there better way in scalaz.stream?
thanks in advance
got working today. cleanest way emit database results stream , attach sink end of stream twitter processing. have bit more complex retrieves database results continuously , sends them off actor twitter processing. style of retrieving results follows original code question:
val connection = new simpleclient(conf.getstring("cassandra.node")) implicit val threadpool = new scheduledthreadpoolexecutor(4) val system = actorsystem("mysystem") val twitterfetch = system.actorof(props[twitterfetch], "twitterfetch") def myeffect = channel.lift[task, simpleclient, string]{ connection: simpleclient => task.delay{ val results = await.result( getall(connection).map { list => list.map(twittertoget => (twittertoget.handle, urlboilerplate + twittertoget.handle + parameters + twittertoget.sinceid) ) }, 5 seconds) println("query successful, results= " +results +" @ " + format.print(system.currenttimemillis())) twitterfetch ! fetched(connection, results) s"database fetch completed" } } val p = process val process = (time.awakeevery(3.second).flatmap(_ => p.emit(connection). through(myeffect))) val fetching = process.runlog.run fetching.foreach(println)
some notes:
i had asked using channel.lift without input, became clear input should cassandra connection.
the line
val process = (time.awakeevery(3.second).flatmap(_ => p.emit(connection). through(myeffect)))
changed zipwith flatmap because wanted retrieve results continuously instead of once.
Comments
Post a Comment