Spark-Cassandra Connector : Failed to open native connection to Cassandra -
i new spark , cassandra. on trying submit spark job, getting error while connecting cassandra.
details:
versions:
spark : 1.3.1 (build hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6) cassandra : 2.0 spark-cassandra-connector: 1.3.0-m1 scala : 2.10.5
spark , cassandra on virtual cluster cluster details:
spark master : 192.168.101.13 spark slaves : 192.168.101.11 , 192.168.101.12 cassandra nodes: 192.168.101.11 (seed node) , 192.168.101.12
i trying submit job through client machine (laptop) - 172.16.0.6. after googling error, have made sure can ping machines on cluster client machine : spark master/slaves , cassandra nodes , disabled firewall on machines. still struggling error.
cassandra.yaml
listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node) start_native_transport: true native_transport_port: 9042 start_rpc: true rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node) rpc_port: 9160
i trying run minimal sample job
import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf import org.apache.spark.rdd.rdd import com.datastax.spark.connector._ val rdd = sc.cassandratable("test", "words") rdd.toarray.foreach(println)
to submit job, use spark-shell (:paste code in spark shell):
spark-shell --jars "/home/ameya/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.3.0-m1/spark-cassandra-connector_2.10-1.3.0-m1.jar","/home/ameya/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.5/cassandra-driver-core-2.1.5.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.final/netty-3.8.0.final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar","/home/ameya/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.final/netty-3.8.0.final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.5/cassandra-clientutil-2.1.5.jar","/home/ameya/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.3/cassandra-thrift-2.1.3.jar","/home/ameya/.m2/repository/org/joda/joda-convert/1.2/joda-convert-1.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar" --master spark://192.168.101.13:7077 --conf spark.cassandra.connection.host=192.168.101.11 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra
the error getting:
warning: there 1 deprecation warning(s); re-run -deprecation details **java.io.ioexception: failed open native connection cassandra @ {192.168.101.11}:9042** @ com.datastax.spark.connector.cql.cassandraconnector$.com$datastax$spark$connector$cql$cassandraconnector$$createsession(cassandraconnector.scala:181) @ com.datastax.spark.connector.cql.cassandraconnector$$anonfun$2.apply(cassandraconnector.scala:167) @ com.datastax.spark.connector.cql.cassandraconnector$$anonfun$2.apply(cassandraconnector.scala:167) @ com.datastax.spark.connector.cql.refcountedcache.createnewvalueandkeys(refcountedcache.scala:31) @ com.datastax.spark.connector.cql.refcountedcache.acquire(refcountedcache.scala:56) @ com.datastax.spark.connector.cql.cassandraconnector.opensession(cassandraconnector.scala:76) @ com.datastax.spark.connector.cql.cassandraconnector.withsessiondo(cassandraconnector.scala:104) @ com.datastax.spark.connector.cql.cassandraconnector.withclusterdo(cassandraconnector.scala:115) @ com.datastax.spark.connector.cql.schema$.fromcassandra(schema.scala:243) @ com.datastax.spark.connector.rdd.cassandratablerowreaderprovider$class.tabledef(cassandratablerowreaderprovider.scala:49) @ com.datastax.spark.connector.rdd.cassandratablescanrdd.tabledef$lzycompute(cassandratablescanrdd.scala:59) @ com.datastax.spark.connector.rdd.cassandratablescanrdd.tabledef(cassandratablescanrdd.scala:59) @ com.datastax.spark.connector.rdd.cassandratablerowreaderprovider$class.verify(cassandratablerowreaderprovider.scala:148) @ com.datastax.spark.connector.rdd.cassandratablescanrdd.verify(cassandratablescanrdd.scala:59) @ com.datastax.spark.connector.rdd.cassandratablescanrdd.getpartitions(cassandratablescanrdd.scala:118) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1512) @ org.apache.spark.rdd.rdd.collect(rdd.scala:813) @ org.apache.spark.rdd.rdd.toarray(rdd.scala:833) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:33) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:38) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:40) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:42) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:44) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:46) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:48) @ $iwc$$iwc$$iwc$$iwc$$iwc.<init>(<console>:50) @ $iwc$$iwc$$iwc$$iwc.<init>(<console>:52) @ $iwc$$iwc$$iwc.<init>(<console>:54) @ $iwc$$iwc.<init>(<console>:56) @ $iwc.<init>(<console>:58) @ <init>(<console>:60) @ .<init>(<console>:64) @ .<clinit>(<console>) @ .<init>(<console>:7) @ .<clinit>(<console>) @ $print(<console>) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.apache.spark.repl.sparkimain$readevalprint.call(sparkimain.scala:1065) @ org.apache.spark.repl.sparkimain$request.loadandrun(sparkimain.scala:1338) @ org.apache.spark.repl.sparkimain.loadandrunreq$1(sparkimain.scala:840) @ org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:871) @ org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:819) @ org.apache.spark.repl.sparkiloop.reallyinterpret$1(sparkiloop.scala:856) @ org.apache.spark.repl.sparkiloop.interpretstartingwith(sparkiloop.scala:901) @ org.apache.spark.repl.sparkiloop.command(sparkiloop.scala:813) @ org.apache.spark.repl.sparkiloop.processline$1(sparkiloop.scala:656) @ org.apache.spark.repl.sparkiloop.innerloop$1(sparkiloop.scala:664) @ org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$loop(sparkiloop.scala:669) @ org.apache.spark.repl.sparkiloop$$anonfun$org$apache$spark$repl$sparkiloop$$process$1.apply$mcz$sp(sparkiloop.scala:996) @ org.apache.spark.repl.sparkiloop$$anonfun$org$apache$spark$repl$sparkiloop$$process$1.apply(sparkiloop.scala:944) @ org.apache.spark.repl.sparkiloop$$anonfun$org$apache$spark$repl$sparkiloop$$process$1.apply(sparkiloop.scala:944) @ scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) @ org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$process(sparkiloop.scala:944) @ org.apache.spark.repl.sparkiloop.process(sparkiloop.scala:1058) @ org.apache.spark.repl.main$.main(main.scala:31) @ org.apache.spark.repl.main.main(main.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:569) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:166) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:189) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:110) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) **caused by: com.datastax.driver.core.exceptions.nohostavailableexception: host(s) tried query failed (tried: /192.168.101.11:9042 (com.datastax.driver.core.transportexception: [/192.168.101.11:9042] connection has been closed))** @ com.datastax.driver.core.controlconnection.reconnectinternal(controlconnection.java:223) @ com.datastax.driver.core.controlconnection.connect(controlconnection.java:78) @ com.datastax.driver.core.cluster$manager.init(cluster.java:1236) @ com.datastax.driver.core.cluster.getmetadata(cluster.java:333) @ com.datastax.spark.connector.cql.cassandraconnector$.com$datastax$spark$connector$cql$cassandraconnector$$createsession(cassandraconnector.scala:174) ... 71 more
can point out doing wrong here ?
you did not specified spark.cassandra.connection.host
default spark assume cassandra host same spark master node.
var sc:sparkcontext=_ val conf = new sparkconf().setappname("cassandra demo").setmaster(master) .set("spark.cassandra.connection.host", "192.168.101.11") c=new sparkcontext(conf) val rdd = sc.cassandratable("test", "words") rdd.toarray.foreach(println)
it should work if have set seed nodein cassandra.yaml
Comments
Post a Comment