java - Custom SortComparator not working in MapReduce wordcount program -
i trying understand how mapreduce sorts map output keys , sort algorithm uses. have text file this
a b e f c b
how performs sorting these keys. implemented custom sortcomparator class extending writablecomparator interface. wanted see how sorting happening writing operations file.
public static class mysortcomparator2 extends writablecomparator{ @override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { text x=new text(""); text y=new text(""); try { x.readfields(new datainputstream(new bytearrayinputstream(b1))); y.readfields(new datainputstream(new bytearrayinputstream(b2))); filewriter writerofcomparisions=new filewriter("/home/srimanth/comparisons",true); writerofcomparisions.write("comparing "+x.tostring()+" "+y.tostring()); writerofcomparisions.write(" ----> returning "+x.compareto(y)); writerofcomparisions.write("\n"); writerofcomparisions.close(); } catch (ioexception e) { e.printstacktrace(); } return x.compareto(y); }
the output of comparisons file this
comparing ----> returning 0 comparing ----> returning 0 comparing ----> returning 0 comparing ----> returning 0 comparing ----> returning 0 comparing b ----> returning 1 comparing c ----> returning 2 comparing f ----> returning 5 comparing e ----> returning 4 comparing b ----> returning 1 comparing b c ----> returning -1 comparing c f ----> returning -3 comparing f e ----> returning 1 comparing e b ----> returning 3 comparing b ----> returning 1
what sorting algorithm , final output of wordcount
b 1 c 1 f 1 e 1 b 1 1
looks mapreduce did not happen properly. here mapper , reducer classes.
public static class tokenizermapper extends mapper<object, text, text, intwritable>{ private final static intwritable 1 = new intwritable(1); private text word = new text(); public void map(object key, text value, context context ) throws ioexception, interruptedexception { stringtokenizer itr = new stringtokenizer(value.tostring()); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one); } } } public static class intsumreducer extends reducer<text,intwritable,text,intwritable> { private intwritable result = new intwritable(); public void reduce(text key, iterable<intwritable> values, context context ) throws ioexception, interruptedexception { int sum = 0; (intwritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
what doing wrong. appreciate help. in advance.
did set following driver code?
job.setsortcomparatorclass(mysortcomparator2.class);
Comments
Post a Comment