r - SparkR collect method crashes with OutOfMemory on Java heap space -
with sparkr, i'm trying poc collect rdd created text files contains around 4m lines.
my spark cluster running in google cloud, bdutil deployed , composed 1 master , 2 workers 15gb of ram , 4 cores each. hdfs repository based on google storage gcs-connector 1.4.0. sparkr intalled on each machine, , basic tests working on small files.
here script use :
sys.setenv("spark_mem" = "1g") sc <- sparkr.init("spark://xxxx:7077", sparkenvir=list(spark.executor.memory="1g")) lines <- textfile(sc, "gs://xxxx/dir/") test <- collect(lines)
first time run this, seems working fine, tasks run successfully, spark's ui says job completed, never r prompt :
15/06/04 13:36:59 warn sparkconf: setting 'spark.executor.extraclasspath' ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' work-around. 15/06/04 13:36:59 warn sparkconf: setting 'spark.driver.extraclasspath' ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' work-around. 15/06/04 13:36:59 info slf4jlogger: slf4jlogger started 15/06/04 13:37:00 info server: jetty-8.y.z-snapshot 15/06/04 13:37:00 info abstractconnector: started socketconnector@0.0.0.0:52439 15/06/04 13:37:00 info server: jetty-8.y.z-snapshot 15/06/04 13:37:00 info abstractconnector: started selectchannelconnector@0.0.0.0:4040 15/06/04 13:37:54 info googlehadoopfilesystembase: ghfs version: 1.4.0-hadoop1 15/06/04 13:37:55 warn loadsnappy: snappy native library available 15/06/04 13:37:55 warn nativecodeloader: unable load native-hadoop library platform... using builtin-java classes applicable 15/06/04 13:37:55 warn loadsnappy: snappy native library not loaded 15/06/04 13:37:55 info fileinputformat: total input paths process : 68 [stage 0:=======================================================> (27 + 10) / 68]
then after ctrl-c r prompt back, try run collect method again, here result :
[stage 1:==========================================================> (28 + 9) / 68]15/06/04 13:42:08 error actorsystemimpl: uncaught fatal error thread [sparkdriver-akka.remote.default-remote-dispatcher-5] shutting down actorsystem [sparkdriver] java.lang.outofmemoryerror: java heap space @ org.spark_project.protobuf.bytestring.tobytearray(bytestring.java:515) @ akka.remote.serialization.messagecontainerserializer.frombinary(messagecontainerserializer.scala:64) @ akka.serialization.serialization$$anonfun$deserialize$1.apply(serialization.scala:104) @ scala.util.try$.apply(try.scala:161) @ akka.serialization.serialization.deserialize(serialization.scala:98) @ akka.remote.messageserializer$.deserialize(messageserializer.scala:23) @ akka.remote.defaultmessagedispatcher.payload$lzycompute$1(endpoint.scala:58) @ akka.remote.defaultmessagedispatcher.payload$1(endpoint.scala:58) @ akka.remote.defaultmessagedispatcher.dispatch(endpoint.scala:76) @ akka.remote.endpointreader$$anonfun$receive$2.applyorelse(endpoint.scala:937) @ akka.actor.actor$class.aroundreceive(actor.scala:465) @ akka.remote.endpointactor.aroundreceive(endpoint.scala:415) @ akka.actor.actorcell.receivemessage(actorcell.scala:516) @ akka.actor.actorcell.invoke(actorcell.scala:487) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238) @ akka.dispatch.mailbox.run(mailbox.scala:220) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107)
i understand exception message, don't understand why getting second time. also, why collect never returns after completing in spark?
i googled every piece of information have, had no luck finding solution. or hint appreciated!
thanks
this appear simple combination of java in-memory object representations being inefficient combined apparent long-lived object references cause collections fail garbage-collected in time new collect() call overwrite old 1 in-place.
i experimented options, , sample 256mb file contains ~4m lines, indeed reproduce behavior collect fine first time, ooms second time, when using spark_mem=1g
. set spark_mem=4g
instead, , i'm able ctrl+c , re-run test <- collect(lines)
many times want.
for 1 thing, if references didn't leak, note after first time ran test <- collect(lines)
, variable test
holding gigantic array of lines, , second time call it, collect(lines)
executes before being assigned test
variable , in straightforward instruction-ordering, there's no way garbage-collect old contents of test
. means second run make sparkrbackend process hold 2 copies of entire collection @ same time, leading oom saw.
to diagnose, on master started sparkr , first ran
dhuo@dhuo-sparkr-m:~$ jps | grep sparkrbackend 8709 sparkrbackend
i checked top
, using around 22mb of memory. fetched heap profile jmap
:
jmap -heap:format=b 8709 mv heap.bin heap0.bin
then ran first round of test <- collect(lines)
@ point running top
showed using ~1.7g of res memory. grabbed heap dump. finally, tried test <- {}
rid of references allow garbage-collection. after doing this, , printing out test
, showing empty, grabbed heap dump , noticed res still showed 1.7g. used jhat heap0.bin
analyze original heap dump, , got:
heap histogram classes (excluding platform) class instance count total size class [b 25126 14174163 class [c 19183 1576884 class [<other> 11841 1067424 class [lscala.concurrent.forkjoin.forkjointask; 16 1048832 class [i 1524 769384 ...
after running collect, had:
heap histogram classes (excluding platform) class instance count total size class [c 2784858 579458804 class [b 27768 70519801 class java.lang.string 2782732 44523712 class [ljava.lang.object; 2567 22380840 class [i 1538 8460152 class [lscala.concurrent.forkjoin.forkjointask; 27 1769904
even after nulled out test
, remained same. shows 2784858 instances of char[], total size of 579mb, , 2782732 instances of string, presumably holding char[]'s above it. followed reference graph way up, , got like
char[] -> string -> string[] -> ... -> class scala.collection.mutable.defaultentry -> class [lscala.collection.mutable.hashentry; -> class scala.collection.mutable.hashmap -> class edu.berkeley.cs.amplab.sparkr.jvmobjecttracker$ -> java.util.vector@0x785b48cd8 (36 bytes) -> sun.misc.launcher$appclassloader@0x7855c31a8 (138 bytes)
and appclassloader had thousands of inbound references. somewhere along chain should've been removing reference failing so, causing entire collected array sit in memory while try fetch second copy of it.
finally, answer question hanging after collect
, appears has data not fitting in r process's memory; here's thread related issue: https://www.mail-archive.com/user@spark.apache.org/msg29155.html
i confirmed using smaller file handful of lines, , running collect
indeed not hang.
Comments
Post a Comment