java - Using WholeFileInputFormat with Hadoop MapReduce still results in Mapper processing 1 line at a time -


to expand on header in using hadoop 2.6.. , need send whole files mapper instead of single line @ time. have followed tom whites code in definitive guide create wholefileinputformat , wholefilerecordreader mapper still processing files 1 line @ time. can see i'm missing in code? used book example can see. guidance appreciated.

wholefileinputformat.java

public class wholefileinputformat extends fileinputformat <nullwritable, byteswritable>{  @override protected boolean issplitable(jobcontext context, path file){     return false; }  @override public recordreader<nullwritable, byteswritable> createrecordreader(         inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception {     wholefilerecordreader reader = new wholefilerecordreader();     reader.initialize(split, context);     return reader; } 

}

wholefilerecordreader.java

public class wholefilerecordreader extends recordreader<nullwritable, byteswritable> { private filesplit filesplit; private configuration conf; private byteswritable value = new byteswritable(); private boolean processed = false;  @override public void initialize(inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception{     this.filesplit = (filesplit) split;     this.conf = context.getconfiguration(); }  @override public boolean nextkeyvalue() throws ioexception, interruptedexception{     if (!processed){         byte[] contents = new byte[(int) filesplit.getlength()];         path file = filesplit.getpath();         filesystem fs = file.getfilesystem(conf);         fsdatainputstream in = null;         try{             in = fs.open(file);             ioutils.readfully(in, contents, 0, contents.length);             value.set(contents, 0, contents.length);         }finally{             ioutils.closestream(in);         }         processed = true;         return  true;     }     return false; }  @override public nullwritable getcurrentkey() throws ioexception, interruptedexception{     return nullwritable.get(); }  @override public byteswritable getcurrentvalue() throws ioexception, interruptedexception{     return value; }  @override public float getprogress() throws ioexception {     return processed ? 1.0f : 0.0f; }  @override public void close() throws ioexception{     //do nothing :) } 

}

and main method mapreduce

public class ecccount { public static void main(string[] args) throws exception {      if (args.length != 2) {       system.out.printf("usage: processlogs <input dir> <output dir>\n");       system.exit(-1);     }      //@suppresswarnings("deprecation")     job job = new job();     job.setjarbyclass(ecccount.class);     job.setjobname("ecccount");      //fileinputformat.setinputpaths(job, new path(args[0]));     wholefileinputformat.setinputpaths(job, new path(args[0]));     fileoutputformat.setoutputpath(job, new path(args[1]));      job.setmapperclass(ecccountmapper.class);     job.setreducerclass(sumreducer.class);      job.setoutputkeyclass(text.class);     job.setoutputvalueclass(intwritable.class);      boolean success = job.waitforcompletion(true);     system.exit(success ? 0 : 1);   } 

}

and mapper measure. right returns value given test case see if returning line or whole file

public class ecccountmapper extends mapper<longwritable, text, text, intwritable>{ @override   public void map(longwritable key, text value, context context)       throws ioexception, interruptedexception {        context.write(new text(value), new intwritable(1));   } 

}

issue can input format of mapper. have longwritable , text. in example mentioned, have used nullwritable, byteswritable because wholefileinputformat having. also, need give job.setinputformatclass(wholefileinputformat.class); in job class(main method). hope helps, , happy coding


Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - Bypass Geo Redirect for specific directories -

php - .htaccess mod_rewrite for dynamic url which has domain names -