Loading Data from Aerospike Failing with Spark Connector

I’m using Spark to load data from an Aerospike cluster in to dataframe, process it, and write it to another Aerospike cluster. The data has two bins: one with a List of Strings and another with a Map where the keys are Strings and the values are Longs. When running the Spark application, it encounters a task failure. The driver stack trace is as follows:

24/06/21 09:59:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.10.16.151 executor 0): java.lang.UnsupportedOperationException: empty.reduceLeft
    at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:185)
    at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:183)
    at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$reduceLeft(ArrayBuffer.scala:49)
    at scala.collection.IndexedSeqOptimized.reduceLeft(IndexedSeqOptimized.scala:77)
    at scala.collection.IndexedSeqOptimized.reduceLeft$(IndexedSeqOptimized.scala:76)
    at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.reduce(TraversableOnce.scala:213)
    at scala.collection.TraversableOnce.reduce$(TraversableOnce.scala:213)
    at scala.collection.AbstractTraversable.reduce(Traversable.scala:108)
    at com.aerospike.spark.converters.TypeConverter$.matchesSchemaType(TypeConverter.scala:246)
    at com.aerospike.spark.converters.TypeConverter$.convertToSparkType(TypeConverter.scala:353)
    at com.aerospike.spark.converters.TypeConverter$.binToValue(TypeConverter.scala:428)
    at com.aerospike.spark.sql.sources.v2.RowIterator.$anonfun$get$2(RowIterator.scala:60)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:48)
    at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:21)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
    at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
    at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Upon looking into it I found the cause of this exception as calling reduce function to perform schema match on the bin value as empty list, is there anyway to resolve this.

Versions:

  • spark version: 3.1.2
  • connector version: 3.3.0

Schema inference is performed when the schema is not provided. Could you please specify the schema and see if it unblocks.

The application encounters the same error even when a schema is provided. I observed that when the schema is not explicitly provided, the connector scans a few records to infer the schema. The exception is also thrown during this process if the bin value is an empty list.

Thanks for trying. Connector 3.3.0 is out of support window. Would it be possible to use the latest 4.5.0 connectors. I ran the following test

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
    Array(
    StructField("one", IntegerType, nullable = false),
    StructField("two", MapType(StringType, IntegerType), nullable = false),
    StructField("three", ArrayType(IntegerType), nullable = false)
  ))

val simpleDF = {
    val inputBuf=  new ArrayBuffer[Row]()
    for ( i <- 1 to TEST_COUNT){
        val one = i
        val two = Map("two:"+i -> i) 
        val three =  if(i%2 ==0) {
             Seq.empty[Int]. //empty list
        }else {
            Seq(i, 2*i, i *3)
        }
        val r = Row(one, two, three)
        inputBuf.append(r)
    }
    val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
    spark.createDataFrame(inputRDD,simpleSchema)
}

simpleDF.write.mode("append").format("aerospike").option("aerospike.writeset", "spark-test") .option("aerospike.updateByKey", "one").option("aerospike.write.mode","update").save()


spark.sqlContext.read.format("aerospike").option("aerospike.seedhost", "localhost:3000").option("aerospike.namespace", "test").option("aerospike.set", "spark-test").load().show()

Please note the empty list. Connector did print an error message, but recovered. Here is trimmed output

scala> spark.sqlContext.read.format("aerospike").option("aerospike.seedhost", "localhost:3000").option("aerospike.namespace", "test").option("aerospike.set", "spark-test").load().show()
24/06/25 21:29:07 ERROR AerospikeTable: caught exception java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 while doing inference.
+-----+--------------------+---------+------------+-----+---+--------------+----------------+
|__key|            __digest| __expiry|__generation|__ttl|one|         three|             two|
+-----+--------------------+---------+------------+-----+---+--------------+----------------+
| null|[1A F0 00 A1 94 E...|457027344|           1|  197| 92|            []|  {two:92 -> 92}|
| null|[19 A0 70 3D A3 B...|457027344|           1|  197| 17|  [17, 34, 51]|  {two:17 -> 17}|
| null|[35 60 D5 D1 AF 8...|457027344|           1|  197| 96|            []|  {two:96 -> 96}|
| null|[4C F0 1C BB 06 8...|457027344|           1|  197| 15|  [15, 30, 45]|  {two:15 -> 15}|
| null|[51 80 7E 09 29 B...|457027344|           1|  196| 69|[69, 138, 207]|  {two:69 -> 69}|
| null|[F0 70 5B 3E 9C 6...|457027344|           1|  196| 63|[63, 126, 189]|  {two:63 -> 63}|
| null|[05 51 F0 DF 55 0...|457027344|           1|  196| 44|            []|  {two:44 -> 44}|
| null|[1

Note the presence of empty list in the output.

1 Like

Thanks for this. I tried running the application on my setup using the latest 4.5.0 connector version, but application failed, here’s the stack trace

24/06/26 17:25:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.InvalidClassException: com.aerospike.spark.sql.sources.v2.AerospikePartitionScan; local class incompatible: stream classdesc serialVersionUID = 2421370070804437145, local class serialVersionUID = 3740084749266903987
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2004)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1851)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:457)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

The application didn’t encounter above error in connector version 3.3.0, can version incompatibility be the reason behind this error.

Could you please share the name of the jar? The jar should be compatible with your Scala and Spark versions. I believe you should try either with https://download.aerospike.com/artifacts/aerospike-spark/4.5.0/aerospike-spark-4.5.0-spark3.1-scala2.12-clientunshaded.jar or https://download.aerospike.com/artifacts/aerospike-spark/4.5.0/aerospike-spark-4.5.0-spark3.1-scala2.12-allshaded.jar. These jars are compatible with scala 2.12 and spark 3.1

Thanks for helping out, figured out my application jar was using connector 3.3.0_spark3.1-allshaded as pom dependency which could be causing conflicts with the connector jar 4.5.0 present in the runtime environment. Removed the dependency from application pom, spark job completed successfully

The connector is writing map data to aerospike bin as KEY_ORDERED_MAP type. can we configure the map subtype before writing to cluster?

Currently, the map bin write type is not configurable.