java - Using WholeFileInputFormat with Hadoop MapReduce still results in Mapper processing 1 line at a time -
to expand on header in using hadoop 2.6.. , need send whole files mapper instead of single line @ time. have followed tom whites code in definitive guide create wholefileinputformat , wholefilerecordreader mapper still processing files 1 line @ time. can see i'm missing in code? used book example can see. guidance appreciated.
wholefileinputformat.java
public class wholefileinputformat extends fileinputformat <nullwritable, byteswritable>{ @override protected boolean issplitable(jobcontext context, path file){ return false; } @override public recordreader<nullwritable, byteswritable> createrecordreader( inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception { wholefilerecordreader reader = new wholefilerecordreader(); reader.initialize(split, context); return reader; }
}
wholefilerecordreader.java
public class wholefilerecordreader extends recordreader<nullwritable, byteswritable> { private filesplit filesplit; private configuration conf; private byteswritable value = new byteswritable(); private boolean processed = false; @override public void initialize(inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception{ this.filesplit = (filesplit) split; this.conf = context.getconfiguration(); } @override public boolean nextkeyvalue() throws ioexception, interruptedexception{ if (!processed){ byte[] contents = new byte[(int) filesplit.getlength()]; path file = filesplit.getpath(); filesystem fs = file.getfilesystem(conf); fsdatainputstream in = null; try{ in = fs.open(file); ioutils.readfully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); }finally{ ioutils.closestream(in); } processed = true; return true; } return false; } @override public nullwritable getcurrentkey() throws ioexception, interruptedexception{ return nullwritable.get(); } @override public byteswritable getcurrentvalue() throws ioexception, interruptedexception{ return value; } @override public float getprogress() throws ioexception { return processed ? 1.0f : 0.0f; } @override public void close() throws ioexception{ //do nothing :) }
}
and main method mapreduce
public class ecccount { public static void main(string[] args) throws exception { if (args.length != 2) { system.out.printf("usage: processlogs <input dir> <output dir>\n"); system.exit(-1); } //@suppresswarnings("deprecation") job job = new job(); job.setjarbyclass(ecccount.class); job.setjobname("ecccount"); //fileinputformat.setinputpaths(job, new path(args[0])); wholefileinputformat.setinputpaths(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); job.setmapperclass(ecccountmapper.class); job.setreducerclass(sumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); boolean success = job.waitforcompletion(true); system.exit(success ? 0 : 1); }
}
and mapper measure. right returns value given test case see if returning line or whole file
public class ecccountmapper extends mapper<longwritable, text, text, intwritable>{ @override public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { context.write(new text(value), new intwritable(1)); }
}
issue can input format of mapper. have longwritable , text. in example mentioned, have used nullwritable, byteswritable because wholefileinputformat having. also, need give job.setinputformatclass(wholefileinputformat.class); in job class(main method). hope helps, , happy coding
Comments
Post a Comment