java - Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException -
i'm trying connect phoenix via spark , keep getting following exception when opening connection via jdbc driver (cut brevity, full stacktrace below):
caused by: java.lang.classnotfoundexception: org.apache.hadoop.hbase.ipc.controller.clientrpccontrollerfactory @ java.net.urlclassloader$1.run(urlclassloader.java:366) @ java.net.urlclassloader$1.run(urlclassloader.java:355) @ java.security.accesscontroller.doprivileged(native method)
the class in question provided jar called phoenix-core-4.3.1.jar (despite being in hbase package namespace, guess need integrate hbase).
there numerous questions on classnotfoundexceptions on spark , i've tried fat-jar approach (both maven's assembly , shade plugins; i've inspected jars, do contain clientrpccontrollerfactory), , i've tried lean jar while specifying jars on command line. latter, command used follows:
/opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-streaming-kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics-core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --class nl.work.kafkastreamconsumer.phoenix.kafkaphoenixconnector kafkastreamconsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true
i've done classpath dump within code , first classloader in hierarchy knows phoenix jar:
2015-06-04 10:52:34,323 [executor task launch worker-1] info nl.work.kafkastreamconsumer.phoenix.linepersister - [file:/home/work/projects/customer/kafkastreamconsumer.jar, file:/home/work/projects/customer/lib/spark-streaming-kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]
so question is: missing here? why can't spark load correct class? there should 1 version of class flying around (namely 1 phoenix-core), doubt it's versioning conflict.
[executor task launch worker-3] error nl.work.kafkastreamconsumer.phoenix.linepersister - error while processing line java.lang.runtimeexception: java.sql.sqlexception: error 103 (08004): unable establish connection. @ nl.work.kafkastreamconsumer.phoenix.phoenixconnection.<init>(phoenixconnection.java:41) @ nl.work.kafkastreamconsumer.phoenix.linepersister$1.call(linepersister.java:40) @ nl.work.kafkastreamconsumer.phoenix.linepersister$1.call(linepersister.java:32) @ org.apache.spark.api.java.javapairrdd$$anonfun$toscalafunction$1.apply(javapairrdd.scala:999) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$17.apply(rdd.scala:813) @ org.apache.spark.rdd.rdd$$anonfun$17.apply(rdd.scala:813) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1498) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1498) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61) @ org.apache.spark.scheduler.task.run(task.scala:64) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:203) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) caused by: java.sql.sqlexception: error 103 (08004): unable establish connection. @ org.apache.phoenix.exception.sqlexceptioncode$factory$1.newexception(sqlexceptioncode.java:362) @ org.apache.phoenix.exception.sqlexceptioninfo.buildexception(sqlexceptioninfo.java:133) @ org.apache.phoenix.query.connectionqueryservicesimpl.openconnection(connectionqueryservicesimpl.java:282) @ org.apache.phoenix.query.connectionqueryservicesimpl.access$300(connectionqueryservicesimpl.java:166) @ org.apache.phoenix.query.connectionqueryservicesimpl$11.call(connectionqueryservicesimpl.java:1831) @ org.apache.phoenix.query.connectionqueryservicesimpl$11.call(connectionqueryservicesimpl.java:1810) @ org.apache.phoenix.util.phoenixcontextexecutor.call(phoenixcontextexecutor.java:77) @ org.apache.phoenix.query.connectionqueryservicesimpl.init(connectionqueryservicesimpl.java:1810) @ org.apache.phoenix.jdbc.phoenixdriver.getconnectionqueryservices(phoenixdriver.java:162) @ org.apache.phoenix.jdbc.phoenixembeddeddriver.connect(phoenixembeddeddriver.java:126) @ org.apache.phoenix.jdbc.phoenixdriver.connect(phoenixdriver.java:133) @ java.sql.drivermanager.getconnection(drivermanager.java:571) @ java.sql.drivermanager.getconnection(drivermanager.java:233) @ nl.work.kafkastreamconsumer.phoenix.phoenixconnection.<init>(phoenixconnection.java:39) ... 25 more caused by: java.io.ioexception: java.lang.reflect.invocationtargetexception @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:457) @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:350) @ org.apache.phoenix.query.hconnectionfactory$hconnectionfactoryimpl.createconnection(hconnectionfactory.java:47) @ org.apache.phoenix.query.connectionqueryservicesimpl.openconnection(connectionqueryservicesimpl.java:280) ... 36 more caused by: java.lang.reflect.invocationtargetexception @ sun.reflect.generatedconstructoraccessor8.newinstance(unknown source) @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45) @ java.lang.reflect.constructor.newinstance(constructor.java:526) @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:455) ... 39 more caused by: java.lang.unsupportedoperationexception: unable find org.apache.hadoop.hbase.ipc.controller.clientrpccontrollerfactory @ org.apache.hadoop.hbase.util.reflectionutils.instantiatewithcustomctor(reflectionutils.java:36) @ org.apache.hadoop.hbase.ipc.rpccontrollerfactory.instantiate(rpccontrollerfactory.java:56) @ org.apache.hadoop.hbase.client.hconnectionmanager$hconnectionimplementation.<init>(hconnectionmanager.java:769) @ org.apache.hadoop.hbase.client.hconnectionmanager$hconnectionimplementation.<init>(hconnectionmanager.java:689) ... 43 more caused by: java.lang.classnotfoundexception: org.apache.hadoop.hbase.ipc.controller.clientrpccontrollerfactory @ java.net.urlclassloader$1.run(urlclassloader.java:366) @ java.net.urlclassloader$1.run(urlclassloader.java:355) @ java.security.accesscontroller.doprivileged(native method) @ java.net.urlclassloader.findclass(urlclassloader.java:354) @ java.lang.classloader.loadclass(classloader.java:425) @ sun.misc.launcher$appclassloader.loadclass(launcher.java:308) @ java.lang.classloader.loadclass(classloader.java:358) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:191) @ org.apache.hadoop.hbase.util.reflectionutils.instantiatewithcustomctor(reflectionutils.java:32) ... 46 more
/edit
unfortunately issue remains 4.4.0-hbase-0.98. below classes in question. since savetophoenix() method not yet available java api , since poc, idea use jdbc driver each mini-batch.
public class phoenixconnection implements autocloseable, serializable { private static final long serialversionuid = -4491057264383873689l; private static final string phoenix_driver = "org.apache.phoenix.jdbc.phoenixdriver"; static { try { class.forname(phoenix_driver); } catch (classnotfoundexception e) { throw new runtimeexception(e); } } private connection connection; public phoenixconnection(final string jdbcuri) { try { connection = drivermanager.getconnection(jdbcuri); } catch (sqlexception e) { throw new runtimeexception(e); } } public list<map<string, object>> executequery(final string sql) throws sqlexception { arraylist<map<string, object>> resultlist = new arraylist<>(); try (preparedstatement statement = connection.preparestatement(sql); resultset resultset = statement.executequery() ) { resultsetmetadata metadata = resultset.getmetadata(); while (resultset.next()) { map<string, object> row = new hashmap<>(metadata.getcolumncount()); (int column = 0; column < metadata.getcolumncount(); ++column) { final string columnlabel = metadata.getcolumnlabel(column); row.put(columnlabel, resultset.getobject(columnlabel)); } } } resultlist.trimtosize(); return resultlist; } @override public void close() { try { connection.close(); } catch (sqlexception e) { throw new runtimeexception(e); } } } public class linepersister implements function<javardd<string>, void> { private static final long serialversionuid = -2529724617108874989l; private static final logger logger = logger.getlogger(linepersister.class); private static final string table_name = "mail_events"; private final string jdbcurl; public linepersister(string jdbcurl) { this.jdbcurl = jdbcurl; } @override public void call(javardd<string> dataset) throws exception { logger.info(string.format( "starting conversion on rdd %d elements", dataset.count())); list<void> collectresult = dataset.map(new function<string, void>() { private static final long serialversionuid = -6651313541439109868l; @override public void call(string line) throws exception { logger.info("writing line " + line); event event = eventparser.parseline(line); try (phoenixconnection connection = new phoenixconnection( jdbcurl)) { connection.executequery(event .createupsertstatement(table_name)); } catch (exception e) { logger.error("error while processing line", e); dumpclasspath(this.getclass().getclassloader()); } return null; } }).collect(); logger.info(string.format("got %d results: ", collectresult.size())); return null; } public static void dumpclasspath(classloader loader) { logger.info("classloader " + loader + ":"); if (loader instanceof urlclassloader) { urlclassloader ucl = (urlclassloader)loader; logger.info(arrays.tostring(ucl.geturls())); } else logger.error("cannot display components not urlclassloader)"); if (loader.getparent() != null) dumpclasspath(loader.getparent()); } } <?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>nl.work</groupid> <artifactid>kafkastreamconsumer</artifactid> <version>1.0</version> <packaging>jar</packaging> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <spark.version>1.3.1</spark.version> <hibernate.version>4.3.10.final</hibernate.version> <phoenix.version>4.4.0-hbase-0.98</phoenix.version> <hbase.version>0.98.9-hadoop2</hbase.version> <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-hbase.version> </properties> <dependencies> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-core_2.10</artifactid> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming_2.10</artifactid> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming-kafka_2.10</artifactid> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>org.apache.phoenix</groupid> <artifactid>phoenix-core</artifactid> <version>${phoenix.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>org.apache.phoenix</groupid> <artifactid>phoenix-spark</artifactid> <version>${phoenix.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-client</artifactid> <version>${hbase.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>com.cloudera</groupid> <artifactid>spark-hbase</artifactid> <version>${spark-hbase.version}</version> <scope>provided</scope> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.3</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <!-- <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-shade-plugin</artifactid> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>meta-inf/*.sf</exclude> <exclude>meta-inf/*.dsa</exclude> <exclude>meta-inf/*.rsa</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> --> </plugins> </build> <repositories> <repository> <id>unknown-jars-temp-repo</id> <name>a temporary repository created netbeans libraries , jars not identify. please replace dependencies in repository correct ones , delete repository.</name> <url>file:${project.basedir}/lib</url> </repository> </repositories> </project>
/edit2 i've tried saveashadoopapifile approach (https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java) yields same error, different stacktrace:
java.lang.runtimeexception: java.sql.sqlexception: error 103 (08004): unable establish connection. @ org.apache.phoenix.mapreduce.phoenixoutputformat.getrecordwriter(phoenixoutputformat.java:58) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$12.apply(pairrddfunctions.scala:995) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$12.apply(pairrddfunctions.scala:979) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61) @ org.apache.spark.scheduler.task.run(task.scala:64) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:203) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) caused by: java.sql.sqlexception: error 103 (08004): unable establish connection. @ org.apache.phoenix.exception.sqlexceptioncode$factory$1.newexception(sqlexceptioncode.java:386) @ org.apache.phoenix.exception.sqlexceptioninfo.buildexception(sqlexceptioninfo.java:145) @ org.apache.phoenix.query.connectionqueryservicesimpl.openconnection(connectionqueryservicesimpl.java:288) @ org.apache.phoenix.query.connectionqueryservicesimpl.access$300(connectionqueryservicesimpl.java:171) @ org.apache.phoenix.query.connectionqueryservicesimpl$12.call(connectionqueryservicesimpl.java:1881) @ org.apache.phoenix.query.connectionqueryservicesimpl$12.call(connectionqueryservicesimpl.java:1860) @ org.apache.phoenix.util.phoenixcontextexecutor.call(phoenixcontextexecutor.java:77) @ org.apache.phoenix.query.connectionqueryservicesimpl.init(connectionqueryservicesimpl.java:1860) @ org.apache.phoenix.jdbc.phoenixdriver.getconnectionqueryservices(phoenixdriver.java:162) @ org.apache.phoenix.jdbc.phoenixembeddeddriver.connect(phoenixembeddeddriver.java:131) @ org.apache.phoenix.jdbc.phoenixdriver.connect(phoenixdriver.java:133) @ java.sql.drivermanager.getconnection(drivermanager.java:571) @ java.sql.drivermanager.getconnection(drivermanager.java:187) @ org.apache.phoenix.mapreduce.util.connectionutil.getconnection(connectionutil.java:92) @ org.apache.phoenix.mapreduce.util.connectionutil.getoutputconnection(connectionutil.java:80) @ org.apache.phoenix.mapreduce.util.connectionutil.getoutputconnection(connectionutil.java:68) @ org.apache.phoenix.mapreduce.phoenixrecordwriter.<init>(phoenixrecordwriter.java:49) @ org.apache.phoenix.mapreduce.phoenixoutputformat.getrecordwriter(phoenixoutputformat.java:55) ... 8 more caused by: java.io.ioexception: java.lang.reflect.invocationtargetexception @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:457) @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:350) @ org.apache.phoenix.query.hconnectionfactory$hconnectionfactoryimpl.createconnection(hconnectionfactory.java:47) @ org.apache.phoenix.query.connectionqueryservicesimpl.openconnection(connectionqueryservicesimpl.java:286) ... 23 more caused by: java.lang.reflect.invocationtargetexception @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method) @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:57) @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45) @ java.lang.reflect.constructor.newinstance(constructor.java:526) @ org.apache.hadoop.hbase.client.hconnectionmanager.createconnection(hconnectionmanager.java:455) ... 26 more caused by: java.lang.unsupportedoperationexception: unable find org.apache.hadoop.hbase.ipc.controller.clientrpccontrollerfactory @ org.apache.hadoop.hbase.util.reflectionutils.instantiatewithcustomctor(reflectionutils.java:36) @ org.apache.hadoop.hbase.ipc.rpccontrollerfactory.instantiate(rpccontrollerfactory.java:56) @ org.apache.hadoop.hbase.client.hconnectionmanager$hconnectionimplementation.<init>(hconnectionmanager.java:769) @ org.apache.hadoop.hbase.client.hconnectionmanager$hconnectionimplementation.<init>(hconnectionmanager.java:689) ... 31 more caused by: java.lang.classnotfoundexception: org.apache.hadoop.hbase.ipc.controller.clientrpccontrollerfactory @ java.net.urlclassloader$1.run(urlclassloader.java:366) @ java.net.urlclassloader$1.run(urlclassloader.java:355) @ java.security.accesscontroller.doprivileged(native method) @ java.net.urlclassloader.findclass(urlclassloader.java:354) @ java.lang.classloader.loadclass(classloader.java:425) @ sun.misc.launcher$appclassloader.loadclass(launcher.java:308) @ java.lang.classloader.loadclass(classloader.java:358) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:191) @ org.apache.hadoop.hbase.util.reflectionutils.instantiatewithcustomctor(reflectionutils.java:32) ... 34 more
the nice people @ phoenix mailinglist gave me answer:
"rather bundle phoenix client jar app, able include in static location either in spark_classpath, or set conf values below (i use spark_classpath myself, though it's deprecated): spark.driver.extraclasspath spark.executor.extraclasspath "
https://www.mail-archive.com/user@spark.apache.org/msg29978.html
Comments
Post a Comment