Spark Streaming throws FileNotFoundException -


spark streaming in cluster mode throwing filenotfoundexception linux file system (gfs - shared file system across nodes) working fine hdfs input.

data available , accessible on path worker nodes.

javapairinputdstream<text, text> mydstream =     jssc.filestream(path, text.class, text.class, custominputformat.class, new function<path, boolean>() {       @override       public boolean call(path v1) throws exception {         return boolean.true;       }     }, false); 

error message:

14/06/03 21:33:40 warn fileinputdstream: error finding new files java.io.filenotfoundexception: file /data/spark/input not exist.         @ org.apache.hadoop.hdfs.distributedfilesystem.liststatusinternal(distributedfilesystem.java:697)         @ org.apache.hadoop.hdfs.distributedfilesystem.access$600(distributedfilesystem.java:105)         @ org.apache.hadoop.hdfs.distributedfilesystem$15.docall(distributedfilesystem.java:755)         @ org.apache.hadoop.hdfs.distributedfilesystem$15.docall(distributedfilesystem.java:751)         @ org.apache.hadoop.fs.filesystemlinkresolver.resolve(filesystemlinkresolver.java:81)         @ org.apache.hadoop.hdfs.distributedfilesystem.liststatus(distributedfilesystem.java:751)         @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1485)         @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1525)         @ org.apache.spark.streaming.dstream.fileinputdstream.findnewfiles(fileinputdstream.scala:176)         @ org.apache.spark.streaming.dstream.fileinputdstream.compute(fileinputdstream.scala:134)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.mappeddstream.compute(mappeddstream.scala:35)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.flatmappeddstream.compute(flatmappeddstream.scala:35)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.filtereddstream.compute(filtereddstream.scala:35)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.mappeddstream.compute(mappeddstream.scala:35)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.flatmappeddstream.compute(flatmappeddstream.scala:35)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1$$anonfun$1.apply(dstream.scala:300)         @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:299)         @ org.apache.spark.streaming.dstream.dstream$$anonfun$getorcompute$1.apply(dstream.scala:287)         @ scala.option.orelse(option.scala:257)         @ org.apache.spark.streaming.dstream.dstream.getorcompute(dstream.scala:284)         @ org.apache.spark.streaming.dstream.foreachdstream.generatejob(foreachdstream.scala:38)         @ org.apache.spark.streaming.dstreamgraph$$anonfun$1.apply(dstreamgraph.scala:116)         @ org.apache.spark.streaming.dstreamgraph$$anonfun$1.apply(dstreamgraph.scala:116)         @ scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike.scala:251)         @ scala.collection.traversablelike$$anonfun$flatmap$1.apply(traversablelike.scala:251)         @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)         @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)         @ scala.collection.traversablelike$class.flatmap(traversablelike.scala:251)         @ scala.collection.abstracttraversable.flatmap(traversable.scala:105)         @ org.apache.spark.streaming.dstreamgraph.generatejobs(dstreamgraph.scala:116)         @ org.apache.spark.streaming.scheduler.jobgenerator$$anonfun$2.apply(jobgenerator.scala:243)         @ org.apache.spark.streaming.scheduler.jobgenerator$$anonfun$2.apply(jobgenerator.scala:241)         @ scala.util.try$.apply(try.scala:161)         @ org.apache.spark.streaming.scheduler.jobgenerator.generatejobs(jobgenerator.scala:241)         @ org.apache.spark.streaming.scheduler.jobgenerator.org$apache$spark$streaming$scheduler$jobgenerator$$processevent(jobgenerator.scala:177)         @ org.apache.spark.streaming.scheduler.jobgenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyorelse(jobgenerator.scala:86)         @ akka.actor.actorcell.receivemessage(actorcell.scala:498)         @ akka.actor.actorcell.invoke(actorcell.scala:456)         @ akka.dispatch.mailbox.processmailbox(mailbox.scala:237)         @ akka.dispatch.mailbox.run(mailbox.scala:219)         @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:386)         @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)         @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)         @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)         @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) 14/06/03 21:33:40 info fileinputdstream: new files @ time 1433347420000 ms: 

note: spark shell works shared file system.

how resolve issue?

javapairinputdstream<text, text> mydstream =     jssc.filestream(path, text.class, text.class, custominputformat.class, new function<path, boolean>() {       @override       public boolean call(path v1) throws exception {         return boolean.true;       }     }, false); 

resolved after directory path prefixed file:///


Comments

Popular posts from this blog

javascript - Bootstrap Popover: iOS Safari strange behaviour -

Magento/PHP - Get phones on all members in a customer group -

session - Logging Out Using PHP -