java - How to measure the time Spark needs to run an action on partitioned RDD? -
i wrote small spark application should measure time spark needs run action on partitioned rdd (combinebykey function sum value).
my problem is, first iteration seems work correctly (calculated duration ~25 ms), next ones take less time (~5 ms). seems me, spark persists data without request so!? can avoid programmatically?
i have know duration spark needs calculate new rdd (without caching / persisting of earlier iterations) --> think duration should 20-25 ms!
to ensure recalculation moved sparkcontext generation for-loops, didn't bring changes...
thanks advices!
here code seems persist data:
public static void main(string[] args) { switchofflogging(); // jetzt try { // setup: read out parameters & initialize sparkcontext string path = args[0]; sparkconf conf = new sparkconf(true); javasparkcontext sc; // create output file & writer system.out.println("\npar.\tcount\tinput.p\tcons.p\ttime"); // rdds used benchmark javardd<string> input = null; javapairrdd<integer, string> pairrdd = null; javapairrdd<integer, string> partitionedrdd = null; javapairrdd<integer, float> consumptionrdd = null; // tasks iterative (10 times same benchmark testing) (int = 0; < 10; i++) { boolean partitioning = true; int partitionscount = 8; sc = new javasparkcontext(conf); sets3credentials(sc, path); input = sc.textfile(path); pairrdd = maptopair(input); partitionedrdd = partition(pairrdd, partitioning, partitionscount); // measure duration long duration = system.currenttimemillis(); // relevant function consumptionrdd = partitionedrdd.combinebykey(createcombiner, mergevalue, mergecombiners); duration = system.currenttimemillis() - duration; // action invoke calculation system.out.println(consumptionrdd.collect().size()); // print results system.out.println("\n" + partitioning + "\t" + partitionscount + "\t" + input.partitions().size() + "\t" + consumptionrdd.partitions().size() + "\t" + duration + " ms"); input = null; pairrdd = null; partitionedrdd = null; consumptionrdd = null; sc.close(); sc.stop(); } } catch (exception e) { e.printstacktrace(); system.out.println(e.getmessage()); } } some helper functions (should not problem):
private static void switchofflogging() { logger.getlogger("org").setlevel(level.off); logger.getlogger("akka").setlevel(level.off); } private static void sets3credentials(javasparkcontext sc, string path) { if (path.startswith("s3n://")) { configuration hadoopconf = sc.hadoopconfiguration(); hadoopconf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.natives3filesystem"); hadoopconf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.natives3filesystem"); hadoopconf.set("fs.s3n.awsaccesskeyid", "mycredentials"); hadoopconf.set("fs.s3n.awssecretaccesskey", "mycredentials"); } } // initial element private static function<string, float> createcombiner = new function<string, float>() { public float call(string dataset) throws exception { string[] data = dataset.split(","); float value = float.valueof(data[2]); return value; } }; // merging function new dataset private static function2<float, string, float> mergevalue = new function2<float, string, float>() { public float call(float sumyet, string dataset) throws exception { string[] data = dataset.split(","); float value = float.valueof(data[2]); sumyet += value; return sumyet; } }; // function sum consumption private static function2<float, float, float> mergecombiners = new function2<float, float, float>() { public float call(float a, float b) throws exception { += b; return a; } }; private static javapairrdd<integer, string> partition(javapairrdd<integer, string> pairrdd, boolean partitioning, int partitionscount) { if (partitioning) { return pairrdd.partitionby(new hashpartitioner(partitionscount)); } else { return pairrdd; } } private static javapairrdd<integer, string> maptopair(javardd<string> input) { return input.maptopair(new pairfunction<string, integer, string>() { public tuple2<integer, string> call(string debsdataset) throws exception { string[] data = debsdataset.split(","); int houseid = integer.valueof(data[6]); return new tuple2<integer, string>(houseid, debsdataset); } }); } and output of spark console:
part. count input.p cons.p time true 8 6 8 20 ms true 8 6 8 23 ms true 8 6 8 7 ms // less!!! true 8 6 8 21 ms true 8 6 8 13 ms true 8 6 8 6 ms // less!!! true 8 6 8 5 ms // less!!! true 8 6 8 6 ms // less!!! true 8 6 8 4 ms // less!!! true 8 6 8 7 ms // less!!!
i found solution me now: wrote separate class calls spark-submit command on new process. can done in loop, every benchmark started in new thread , sparkcontext separated per process. garbage collection done , works fine!
string submitcommand = "/root/spark/bin/spark-submit " + submitparams + " -- class partitioning.partitionexample /root/partitioning.jar " + javaflags; process p = runtime.getruntime().exec(submitcommand); bufferedreader reader; string line; system.out.println(p.waitfor()); reader = new bufferedreader(new inputstreamreader(p.getinputstream())); while ((line = reader.readline())!= null) { system.out.println(line); }
Comments
Post a Comment