r - SparkR collect method crashes with OutOfMemory on Java heap space -


with sparkr, i'm trying poc collect rdd created text files contains around 4m lines.

my spark cluster running in google cloud, bdutil deployed , composed 1 master , 2 workers 15gb of ram , 4 cores each. hdfs repository based on google storage gcs-connector 1.4.0. sparkr intalled on each machine, , basic tests working on small files.

here script use :

sys.setenv("spark_mem" = "1g") sc <- sparkr.init("spark://xxxx:7077", sparkenvir=list(spark.executor.memory="1g")) lines <- textfile(sc, "gs://xxxx/dir/") test <- collect(lines) 

first time run this, seems working fine, tasks run successfully, spark's ui says job completed, never r prompt :

15/06/04 13:36:59 warn sparkconf: setting 'spark.executor.extraclasspath' ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' work-around. 15/06/04 13:36:59 warn sparkconf: setting 'spark.driver.extraclasspath' ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' work-around. 15/06/04 13:36:59 info slf4jlogger: slf4jlogger started 15/06/04 13:37:00 info server: jetty-8.y.z-snapshot 15/06/04 13:37:00 info abstractconnector: started socketconnector@0.0.0.0:52439 15/06/04 13:37:00 info server: jetty-8.y.z-snapshot 15/06/04 13:37:00 info abstractconnector: started selectchannelconnector@0.0.0.0:4040  15/06/04 13:37:54 info googlehadoopfilesystembase: ghfs version: 1.4.0-hadoop1 15/06/04 13:37:55 warn loadsnappy: snappy native library available 15/06/04 13:37:55 warn nativecodeloader: unable load native-hadoop library platform... using builtin-java classes applicable 15/06/04 13:37:55 warn loadsnappy: snappy native library not loaded 15/06/04 13:37:55 info fileinputformat: total input paths process : 68 [stage 0:=======================================================>                                                                                     (27 + 10) / 68] 

then after ctrl-c r prompt back, try run collect method again, here result :

[stage 1:==========================================================>                                                                                   (28 + 9) / 68]15/06/04 13:42:08 error actorsystemimpl: uncaught fatal error thread [sparkdriver-akka.remote.default-remote-dispatcher-5] shutting down actorsystem [sparkdriver] java.lang.outofmemoryerror: java heap space         @ org.spark_project.protobuf.bytestring.tobytearray(bytestring.java:515)         @ akka.remote.serialization.messagecontainerserializer.frombinary(messagecontainerserializer.scala:64)         @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104)         @ scala.util.try$.apply(try.scala:161)         @ akka.serialization.serialization.deserialize(serialization.scala:98)         @ akka.remote.messageserializer$.deserialize(messageserializer.scala:23)         @ akka.remote.defaultmessagedispatcher.payload$lzycompute$1(endpoint.scala:58)         @ akka.remote.defaultmessagedispatcher.payload$1(endpoint.scala:58)         @ akka.remote.defaultmessagedispatcher.dispatch(endpoint.scala:76)         @ akka.remote.endpointreader$$anonfun$receive$2.applyorelse(endpoint.scala:937)         @ akka.actor.actor$class.aroundreceive(actor.scala:465)         @ akka.remote.endpointactor.aroundreceive(endpoint.scala:415)         @ akka.actor.actorcell.receivemessage(actorcell.scala:516)         @ akka.actor.actorcell.invoke(actorcell.scala:487)         @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238)         @ akka.dispatch.mailbox.run(mailbox.scala:220)         @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393)         @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)         @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)         @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)         @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) 

i understand exception message, don't understand why getting second time. also, why collect never returns after completing in spark?

i googled every piece of information have, had no luck finding solution. or hint appreciated!

thanks

this appear simple combination of java in-memory object representations being inefficient combined apparent long-lived object references cause collections fail garbage-collected in time new collect() call overwrite old 1 in-place.

i experimented options, , sample 256mb file contains ~4m lines, indeed reproduce behavior collect fine first time, ooms second time, when using spark_mem=1g. set spark_mem=4g instead, , i'm able ctrl+c , re-run test <- collect(lines) many times want.

for 1 thing, if references didn't leak, note after first time ran test <- collect(lines), variable test holding gigantic array of lines, , second time call it, collect(lines) executes before being assigned test variable , in straightforward instruction-ordering, there's no way garbage-collect old contents of test. means second run make sparkrbackend process hold 2 copies of entire collection @ same time, leading oom saw.

to diagnose, on master started sparkr , first ran

dhuo@dhuo-sparkr-m:~$ jps | grep sparkrbackend 8709 sparkrbackend 

i checked top , using around 22mb of memory. fetched heap profile jmap:

jmap -heap:format=b 8709 mv heap.bin heap0.bin 

then ran first round of test <- collect(lines) @ point running top showed using ~1.7g of res memory. grabbed heap dump. finally, tried test <- {} rid of references allow garbage-collection. after doing this, , printing out test , showing empty, grabbed heap dump , noticed res still showed 1.7g. used jhat heap0.bin analyze original heap dump, , got:

heap histogram  classes (excluding platform)  class   instance count  total size class [b    25126   14174163 class [c    19183   1576884 class [<other>  11841   1067424 class [lscala.concurrent.forkjoin.forkjointask; 16  1048832 class [i    1524    769384 ... 

after running collect, had:

heap histogram  classes (excluding platform)  class   instance count  total size class [c    2784858 579458804 class [b    27768   70519801 class java.lang.string  2782732 44523712 class [ljava.lang.object;   2567    22380840 class [i    1538    8460152 class [lscala.concurrent.forkjoin.forkjointask; 27  1769904 

even after nulled out test, remained same. shows 2784858 instances of char[], total size of 579mb, , 2782732 instances of string, presumably holding char[]'s above it. followed reference graph way up, , got like

char[] -> string -> string[] -> ... -> class scala.collection.mutable.defaultentry -> class [lscala.collection.mutable.hashentry; -> class scala.collection.mutable.hashmap -> class edu.berkeley.cs.amplab.sparkr.jvmobjecttracker$ -> java.util.vector@0x785b48cd8 (36 bytes) -> sun.misc.launcher$appclassloader@0x7855c31a8 (138 bytes)

and appclassloader had thousands of inbound references. somewhere along chain should've been removing reference failing so, causing entire collected array sit in memory while try fetch second copy of it.

finally, answer question hanging after collect, appears has data not fitting in r process's memory; here's thread related issue: https://www.mail-archive.com/user@spark.apache.org/msg29155.html

i confirmed using smaller file handful of lines, , running collect indeed not hang.


Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - Bypass Geo Redirect for specific directories -

php - .htaccess mod_rewrite for dynamic url which has domain names -