scala - continuously fetch database results with scalaz.stream -


i'm new scala , extremely new scalaz. through different stackoverflow answer , handholding, able use scalaz.stream implement process continuously fetch twitter api results. i'd same thing cassandra db twitter handles stored.

the code fetching twitter results here:

def urls: seq[(handle,url)] = {  await.result(     getall(connection).map { list =>       list.map(twittertoget =>    (twittertoget.handle, urlboilerplate + twittertoget.handle + parameters + twittertoget.sinceid)    )   },     5 seconds)  }  val fetchurl = channel.lift[task, (handle, url), fetched] {   url => task.delay {      val finalresult = calltwitter(url)     if (finalresult.tweets.nonempty) {       connection.updatetwitter(finalresult)     } else {       println("\n" + finalresult.handle + " not have new tweets")     }     s"\ntwitter fetch & database update completed"    } }  val p = process val process =   (time.awakeevery(3.second) zipwith p.emitall(urls))((b, url) => url).     through(fetchurl)  val fetched = process.runlog.run fetched.foreach(println) 

what i'm planning use

def urls: seq[(handle,url)] = { 

to continuously fetch cassandra results (with awakeevery) , send them off actor run above twitter fetching code.

my question is, best way implement scalaz.stream? note i'd database results, have delay before getting database results again. should use same architecture twitter fetching code above? if so, how create channel.lift doesn't require input? there better way in scalaz.stream?

thanks in advance

got working today. cleanest way emit database results stream , attach sink end of stream twitter processing. have bit more complex retrieves database results continuously , sends them off actor twitter processing. style of retrieving results follows original code question:

val connection = new simpleclient(conf.getstring("cassandra.node"))  implicit val threadpool = new scheduledthreadpoolexecutor(4) val system = actorsystem("mysystem") val twitterfetch = system.actorof(props[twitterfetch], "twitterfetch")    def myeffect = channel.lift[task, simpleclient, string]{     connection: simpleclient => task.delay{        val results = await.result(         getall(connection).map { list =>           list.map(twittertoget =>             (twittertoget.handle, urlboilerplate + twittertoget.handle + parameters + twittertoget.sinceid)           )         },         5 seconds)        println("query successful, results= " +results +" @ " + format.print(system.currenttimemillis()))        twitterfetch ! fetched(connection, results)       s"database fetch completed"     }   }    val p = process   val process =     (time.awakeevery(3.second).flatmap(_ => p.emit(connection).       through(myeffect)))    val fetching = process.runlog.run   fetching.foreach(println) 

some notes:

i had asked using channel.lift without input, became clear input should cassandra connection.

the line

val process = (time.awakeevery(3.second).flatmap(_ => p.emit(connection).   through(myeffect))) 

changed zipwith flatmap because wanted retrieve results continuously instead of once.


Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - .htaccess mod_rewrite for dynamic url which has domain names -

Website Login Issue developed in magento -