I’m very new to aerospike (and fairly new to spark) and am trying to get the two working together, but having some issues…
Since I was originally building the aerospark jar away from the db server I built with tests disabled. When I ran my test code (just the code below from the project readme) I was able to successfully import data but would get a null pointer exception when trying to read it back (with aql I can verify that the data has indeed been saved). The code as it stands is:
Save Data:
val TEST_COUNT = 100
val namespace = "test"
var client = AerospikeConnection.getClient("localhost", 3000)
Value.UseDoubleType = true
for (i <- 1 to TEST_COUNT) {
val key = new Key(namespace, "rdd-test", "rdd-test-"+i)
client.put(null, key,
new Bin("one", i),
new Bin("two", "two:"+i),
new Bin("three", i.toDouble)
)
}
and try to read something back:
val thingsDF = sqlContext.read.
format("com.aerospike.spark.sql").
option("aerospike.seedhost", "127.0.0.1").
option("aerospike.port", "3000").
option("aerospike.namespace", namespace).
option("aerospike.set", "rdd-test").
load
thingsDF.registerTempTable("things")
val filteredThings = sqlContext.sql("select * from things where one = 55")
val thing = filteredThings.first()
Figuring that I had better validate the basic setup I tried building the jar file on the database machine with tests enabled. It turns out I get the same exception. Here’s the first one from the log:
[info] AerospikeRelationTest: [info] Aerospike Relation [info] - should create test data [info] - should create an AerospikeRelation *** FAILED *** [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException [info] at com.aerospike.helper.model.Set.setInfo(Set.java:75) [info] at com.aerospike.helper.model.Set.(Set.java:36) [info] at com.aerospike.helper.model.Namespace.mergeSet(Namespace.java:82) [info] at com.aerospike.helper.query.QueryEngine.refreshNamespaceData(QueryEngine.java:555) [info] at com.aerospike.helper.query.QueryEngine.refreshNamespaces(QueryEngine.java:533) [info] at com.aerospike.helper.query.QueryEngine.refreshCluster(QueryEngine.java:508) [info] at com.aerospike.helper.query.QueryEngine.setClient(QueryEngine.java:121) [info] at com.aerospike.helper.query.QueryEngine.(QueryEngine.java:105) [info] at com.aerospike.spark.sql.AerospikeConnection$$anonfun$getQueryEngine$1.apply(AerospikeConnection.scala:22) [info] at com.aerospike.spark.sql.AerospikeConnection$$anonfun$getQueryEngine$1.apply(AerospikeConnection.scala:21) [info] at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) [info] at scala.collection.AbstractMap.getOrElse(Map.scala:58) [info] at com.aerospike.spark.sql.AerospikeConnection$.getQueryEngine(AerospikeConnection.scala:21) [info] at com.aerospike.spark.sql.KeyRecordRDD.compute(KeyRecordRDD.scala:77) [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) [info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) [info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) [info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [info] at org.apache.spark.scheduler.Task.run(Task.scala:89) [info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [info] at java.lang.Thread.run(Thread.java:745) [info] [info] Driver stacktrace: [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) [info] at scala.Option.foreach(Option.scala:236) [info] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) [info] … [info] Cause: java.lang.NullPointerException: [info] at com.aerospike.helper.model.Set.setInfo(Set.java:75) [info] at com.aerospike.helper.model.Set.(Set.java:36) [info] at com.aerospike.helper.model.Namespace.mergeSet(Namespace.java:82) [info] at com.aerospike.helper.query.QueryEngine.refreshNamespaceData(QueryEngine.java:555) [info] at com.aerospike.helper.query.QueryEngine.refreshNamespaces(QueryEngine.java:533) [info] at com.aerospike.helper.query.QueryEngine.refreshCluster(QueryEngine.java:508) [info] at com.aerospike.helper.query.QueryEngine.setClient(QueryEngine.java:121) [info] at com.aerospike.helper.query.QueryEngine.(QueryEngine.java:105) [info] at com.aerospike.spark.sql.AerospikeConnection$$anonfun$getQueryEngine$1.apply(AerospikeConnection.scala:22) [info] at com.aerospike.spark.sql.AerospikeConnection$$anonfun$getQueryEngine$1.apply(AerospikeConnection.scala:21) [info] …
I’m assuming that there must be something wrong in my environment. I’m using sbt 0.13, scala 2.10 and java 1.8.0_91. I’m seeing the same issue on both Mac OS X and linux (ubuntu 14.04). Might anyone have any suggestions as to what my issue might be?