java - How to measure the time Spark needs to run an action on partitioned RDD? -


i wrote small spark application should measure time spark needs run action on partitioned rdd (combinebykey function sum value).

my problem is, first iteration seems work correctly (calculated duration ~25 ms), next ones take less time (~5 ms). seems me, spark persists data without request so!? can avoid programmatically?

i have know duration spark needs calculate new rdd (without caching / persisting of earlier iterations) --> think duration should 20-25 ms!

to ensure recalculation moved sparkcontext generation for-loops, didn't bring changes...

thanks advices!

here code seems persist data:

public static void main(string[] args) {      switchofflogging();      // jetzt      try {         // setup: read out parameters & initialize sparkcontext         string path = args[0];         sparkconf conf = new sparkconf(true);         javasparkcontext sc;          // create output file & writer         system.out.println("\npar.\tcount\tinput.p\tcons.p\ttime");          // rdds used benchmark         javardd<string> input = null;         javapairrdd<integer, string> pairrdd = null;         javapairrdd<integer, string> partitionedrdd = null;         javapairrdd<integer, float> consumptionrdd = null;          // tasks iterative (10 times same benchmark testing)         (int = 0; < 10; i++) {             boolean partitioning = true;             int partitionscount = 8;              sc = new javasparkcontext(conf);             sets3credentials(sc, path);              input = sc.textfile(path);             pairrdd = maptopair(input);              partitionedrdd = partition(pairrdd, partitioning, partitionscount);              // measure duration             long duration = system.currenttimemillis();             // relevant function             consumptionrdd = partitionedrdd.combinebykey(createcombiner, mergevalue, mergecombiners);             duration = system.currenttimemillis() - duration;              // action invoke calculation             system.out.println(consumptionrdd.collect().size());              // print results             system.out.println("\n" + partitioning + "\t" + partitionscount + "\t" + input.partitions().size() + "\t" + consumptionrdd.partitions().size() + "\t" + duration + " ms");              input = null;             pairrdd = null;             partitionedrdd = null;             consumptionrdd = null;              sc.close();             sc.stop();          }     } catch (exception e) {         e.printstacktrace();         system.out.println(e.getmessage());     } } 

some helper functions (should not problem):

private static void switchofflogging() {     logger.getlogger("org").setlevel(level.off);     logger.getlogger("akka").setlevel(level.off); }  private static void sets3credentials(javasparkcontext sc, string path) {     if (path.startswith("s3n://")) {         configuration hadoopconf = sc.hadoopconfiguration();         hadoopconf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.natives3filesystem");         hadoopconf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.natives3filesystem");         hadoopconf.set("fs.s3n.awsaccesskeyid", "mycredentials");         hadoopconf.set("fs.s3n.awssecretaccesskey", "mycredentials");     } }  // initial element private static function<string, float> createcombiner = new function<string, float>() {     public float call(string dataset) throws exception {         string[] data = dataset.split(",");         float value = float.valueof(data[2]);         return value;     } };  // merging function new dataset private static function2<float, string, float> mergevalue = new function2<float, string, float>() {     public float call(float sumyet, string dataset) throws exception {         string[] data = dataset.split(",");         float value = float.valueof(data[2]);         sumyet += value;         return sumyet;     } };  // function sum consumption private static function2<float, float, float> mergecombiners = new function2<float, float, float>() {     public float call(float a, float b) throws exception {         += b;         return a;     } };  private static javapairrdd<integer, string> partition(javapairrdd<integer, string> pairrdd, boolean partitioning, int partitionscount) {     if (partitioning) {         return pairrdd.partitionby(new hashpartitioner(partitionscount));     } else {         return pairrdd;     } }  private static javapairrdd<integer, string> maptopair(javardd<string> input) {     return input.maptopair(new pairfunction<string, integer, string>() {         public tuple2<integer, string> call(string debsdataset) throws exception {             string[] data = debsdataset.split(",");             int houseid = integer.valueof(data[6]);             return new tuple2<integer, string>(houseid, debsdataset);         }     }); } 

and output of spark console:

part.   count   input.p cons.p  time true    8       6       8       20 ms true    8       6       8       23 ms true    8       6       8       7 ms        // less!!! true    8       6       8       21 ms true    8       6       8       13 ms true    8       6       8       6 ms        // less!!! true    8       6       8       5 ms        // less!!! true    8       6       8       6 ms        // less!!! true    8       6       8       4 ms        // less!!! true    8       6       8       7 ms        // less!!! 

i found solution me now: wrote separate class calls spark-submit command on new process. can done in loop, every benchmark started in new thread , sparkcontext separated per process. garbage collection done , works fine!

string submitcommand = "/root/spark/bin/spark-submit " + submitparams + " --   class partitioning.partitionexample /root/partitioning.jar " + javaflags; process p = runtime.getruntime().exec(submitcommand);  bufferedreader reader; string line;  system.out.println(p.waitfor()); reader = new bufferedreader(new inputstreamreader(p.getinputstream()));          while ((line = reader.readline())!= null) {   system.out.println(line); } 

Comments

Popular posts from this blog

javascript - Bootstrap Popover: iOS Safari strange behaviour -

Magento/PHP - Get phones on all members in a customer group -

session - Logging Out Using PHP -