I’m using Spark to load data from an Aerospike cluster in to dataframe, process it, and write it to another Aerospike cluster. The data has two bins: one with a List of Strings and another with a Map where the keys are Strings and the values are Longs. When running the Spark application, it encounters a task failure. The driver stack trace is as follows:
24/06/21 09:59:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.10.16.151 executor 0): java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:185)
at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:183)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$reduceLeft(ArrayBuffer.scala:49)
at scala.collection.IndexedSeqOptimized.reduceLeft(IndexedSeqOptimized.scala:77)
at scala.collection.IndexedSeqOptimized.reduceLeft$(IndexedSeqOptimized.scala:76)
at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.reduce(TraversableOnce.scala:213)
at scala.collection.TraversableOnce.reduce$(TraversableOnce.scala:213)
at scala.collection.AbstractTraversable.reduce(Traversable.scala:108)
at com.aerospike.spark.converters.TypeConverter$.matchesSchemaType(TypeConverter.scala:246)
at com.aerospike.spark.converters.TypeConverter$.convertToSparkType(TypeConverter.scala:353)
at com.aerospike.spark.converters.TypeConverter$.binToValue(TypeConverter.scala:428)
at com.aerospike.spark.sql.sources.v2.RowIterator.$anonfun$get$2(RowIterator.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:48)
at com.aerospike.spark.sql.sources.v2.RowIterator.get(RowIterator.scala:21)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Upon looking into it I found the cause of this exception as calling reduce function to perform schema match on the bin value as empty list, is there anyway to resolve this.
Versions:
- spark version: 3.1.2
- connector version: 3.3.0