hadoop - Multiple file storage in HDFS using Apache Spark -


i doing project involves using hdfs storage , apache spark computation. have directory in hdfs have several text files in @ same depth.i want process these files using spark , store corresponding results hdfs 1 output file each input file.

for example - suppose have directory 1000 text files in @ same depth. reading these files using wildcards

sc.wholetextfiles(hdfs://localhost:9000/home/akshat/files/*.txt) 

then process them using spark , corresponding rdd , save using

result.saveastextfile("hdfs://localhost:9000/home/akshat/final") 

but gives me result of input files in 1 single file , want each file, process them individually , store output of each of them individually.

what should next approach achieve ?
in advance!

you can using wholetextfiles() , note: below approach process files 1 one.

val data = sc.wholetextfiles("hdfs://master:port/vijay/mywordcount/")  val files = data.map { case (filename, content) => filename}   def dosomething(file: string) = {    println (file);   // logic of processing single file comes here   val logdata = sc.textfile(file);  val numas = logdata.filter(line => line.contains("a")).count();  println("lines a: %s".format(numas));   // save rdd of single file processed data hdfs  comes here  }  files.collect.foreach( filename => {     dosomething(filename)  })  

where:

  • hdfs://master:port/vijay/mywordcount/ --- hdfs dir
  • data - org.apache.spark.rdd.rdd[(string, string)]
  • files - org.apache.spark.rdd.rdd[string]- filenames
  • dosomething(filename) - logic

update: multiple output files

/* simpleapp.scala */ import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf  /* hadoop */  import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.nullwritable import org.apache.hadoop.mapred.lib.multipletextoutputformat  /* java */ import java.io.serializable;  import org.apache.log4j.logger import org.apache.log4j.level  /* custom textoutput format */ class rddmultipletextoutputformat extends multipletextoutputformat[any, any] {   override def generateactualkey(key: any, value: any): =     nullwritable.get()    override def generatefilenameforkeyvalue(key: any, value: any, name: string): string =     return key.asinstanceof[string] +"-"+ name;   // output hdfs://ouptut_dir/inputfilename-part-****   //return key.asinstanceof[string] +"/"+ name;   // output hdfs://ouptut_dir/inputfilename/part-**** [inputfilename - directory of partfiles ] }  /* spark context */ object spark {   val sc = new sparkcontext(new sparkconf().setappname("test").setmaster("local[*]")) }  /* wordcount processing */  object process extends serializable{   def apply(filename: string): org.apache.spark.rdd.rdd[(string, string)]= {     println("i called.....")     val simple_path = filename.split('/').last;     val lines = spark.sc.textfile(filename);     val counts     = lines.flatmap(line => line.split(" ")).map(word => (word, 1)).reducebykey(_ + _); //(word,count)     val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2));   // (filename,word\tcount)     fname_word_counts   } }  object simpleapp  {          def main(args: array[string]) {              //logger.getlogger("org").setlevel(level.off)             //logger.getlogger("akka").setlevel(level.off)              // input ans output paths             val input_path = "hdfs://master:8020/vijay/mywordcount/"             val output_path = "hdfs://master:8020/vijay/mywordcount/output/"              // context             val context = spark.sc             val data = context.wholetextfiles(input_path)              // final output rdd             var output : org.apache.spark.rdd.rdd[(string, string)] = context.emptyrdd              // files process             val files = data.map { case (filename, content) => filename}               // apply wordcount processing on each file received in wholetextfiles.             files.collect.foreach( filename => {                             output = output.union(process(filename));             })             //output.saveastextfile(output_path);   // save output (filename,word\tcount)            output.saveashadoopfile(output_path, classof[string], classof[string],classof[rddmultipletextoutputformat])  // custom output format.             //close context            context.stop();           } } 

environment:

  • scala compiler version 2.10.2
  • spark-1.2.0-bin-hadoop2.3
  • hadoop 2.3.0-cdh5.0.3

sample output:

[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output found 5 items -rw-r--r--   3 ramisetty supergroup          0 2015-06-09 03:49 /vijay/mywordcount/output/_success -rw-r--r--   3 ramisetty supergroup         40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000 -rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001 -rw-r--r--   3 ramisetty supergroup         44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002 -rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003 

Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - Bypass Geo Redirect for specific directories -

php - .htaccess mod_rewrite for dynamic url which has domain names -