hadoop - Multiple file storage in HDFS using Apache Spark -
i doing project involves using hdfs storage , apache spark computation. have directory in hdfs have several text files in @ same depth.i want process these files using spark , store corresponding results hdfs 1 output file each input file.
for example - suppose have directory 1000 text files in @ same depth. reading these files using wildcards
sc.wholetextfiles(hdfs://localhost:9000/home/akshat/files/*.txt)
then process them using spark , corresponding rdd , save using
result.saveastextfile("hdfs://localhost:9000/home/akshat/final")
but gives me result of input files in 1 single file , want each file, process them individually , store output of each of them individually.
what should next approach achieve ?
in advance!
you can using wholetextfiles() , note: below approach process files 1 one.
val data = sc.wholetextfiles("hdfs://master:port/vijay/mywordcount/") val files = data.map { case (filename, content) => filename} def dosomething(file: string) = { println (file); // logic of processing single file comes here val logdata = sc.textfile(file); val numas = logdata.filter(line => line.contains("a")).count(); println("lines a: %s".format(numas)); // save rdd of single file processed data hdfs comes here } files.collect.foreach( filename => { dosomething(filename) })
where:
- hdfs://master:port/vijay/mywordcount/ --- hdfs dir
- data - org.apache.spark.rdd.rdd[(string, string)]
- files - org.apache.spark.rdd.rdd[string]- filenames
- dosomething(filename) - logic
update: multiple output files
/* simpleapp.scala */ import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf /* hadoop */ import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.nullwritable import org.apache.hadoop.mapred.lib.multipletextoutputformat /* java */ import java.io.serializable; import org.apache.log4j.logger import org.apache.log4j.level /* custom textoutput format */ class rddmultipletextoutputformat extends multipletextoutputformat[any, any] { override def generateactualkey(key: any, value: any): = nullwritable.get() override def generatefilenameforkeyvalue(key: any, value: any, name: string): string = return key.asinstanceof[string] +"-"+ name; // output hdfs://ouptut_dir/inputfilename-part-**** //return key.asinstanceof[string] +"/"+ name; // output hdfs://ouptut_dir/inputfilename/part-**** [inputfilename - directory of partfiles ] } /* spark context */ object spark { val sc = new sparkcontext(new sparkconf().setappname("test").setmaster("local[*]")) } /* wordcount processing */ object process extends serializable{ def apply(filename: string): org.apache.spark.rdd.rdd[(string, string)]= { println("i called.....") val simple_path = filename.split('/').last; val lines = spark.sc.textfile(filename); val counts = lines.flatmap(line => line.split(" ")).map(word => (word, 1)).reducebykey(_ + _); //(word,count) val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2)); // (filename,word\tcount) fname_word_counts } } object simpleapp { def main(args: array[string]) { //logger.getlogger("org").setlevel(level.off) //logger.getlogger("akka").setlevel(level.off) // input ans output paths val input_path = "hdfs://master:8020/vijay/mywordcount/" val output_path = "hdfs://master:8020/vijay/mywordcount/output/" // context val context = spark.sc val data = context.wholetextfiles(input_path) // final output rdd var output : org.apache.spark.rdd.rdd[(string, string)] = context.emptyrdd // files process val files = data.map { case (filename, content) => filename} // apply wordcount processing on each file received in wholetextfiles. files.collect.foreach( filename => { output = output.union(process(filename)); }) //output.saveastextfile(output_path); // save output (filename,word\tcount) output.saveashadoopfile(output_path, classof[string], classof[string],classof[rddmultipletextoutputformat]) // custom output format. //close context context.stop(); } }
environment:
- scala compiler version 2.10.2
- spark-1.2.0-bin-hadoop2.3
- hadoop 2.3.0-cdh5.0.3
sample output:
[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output found 5 items -rw-r--r-- 3 ramisetty supergroup 0 2015-06-09 03:49 /vijay/mywordcount/output/_success -rw-r--r-- 3 ramisetty supergroup 40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000 -rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001 -rw-r--r-- 3 ramisetty supergroup 44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002 -rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003
Comments
Post a Comment