Aerospike Spark Write with Partition Info

I see in our document: [Data Distribution | Aerospike Documentation]

To determine record assignment to a partition, the record’s key (of any size) is hashed into a 20-byte fixed-length digest using RIPEMD160. The partition ID of the record is determined using 12 bits of this digest.

def getPartitionId(key: Any): Integer = {
<i>// Assuming key is a byte array representing the hashed key</i> val hash = key.asInstanceOf[Array[Byte]]
val partitionId = (hash(0) & 0xFF) << 4 | (hash(1) & 0xFF) >>> 4 <i>// Extracting 12 bits from the hash</i> partitionId
}

bodyRecordsDS.groupByKey(row => getPartitionId(row.key))(Encoders.*INT*)
.mapGroups {
case (pId, partitionRecords) =>
partitionRecords.grouped(parameters.aerospikeWriteBatchSize)
.map(batches => {
executeWriteRequests(client, batches)
}).toList
}(*aerospikeResultEncoder* )

I have two questions:

  1. For Aerospike spark usage, do I need to partition by partitionId?
  2. Since I need to find a way to partition which uses groupByKey, is the function getPartitionId I wrote above the right way to get the partitionId?

It appears you are trying to write some data from Spark to Aerospike. I recommend using the Spark connector for all Spark-related operations.

Assuming the above code uses the Aerospike Java client for executing batch writes into the database, there is no need to extract the corresponding Aerospike partition from the Spark rows. To write data into the database, you should first:

  • Decide which column of the Spark DataFrame row will be used for the construction of an Aerospike Key.
  • Construct Bins from all the Spark columns you wish to write to the DB.
  • Having assembled the Key and Bins, you can now write to the database using any client write APIs. Based on the key object, the client will determine the corresponding Aerospike partition ID and write the records appropriately.

Instead of extracting the Aerospike partition ID from the raw byte array, I would rather extract it from the Key object:

def getPartitionIDFromKey(keyObj: Key): Int = (Buffer.littleBytesToInt(keyObj.digest, 0) & 0xffff) % 4096

Thanks Rahul for replying me. Since I already write my batch write function. For me I have a df = dataset[AerospikeRecord], each of them I created as:

        val ops = List(
          AerospikeBinPut("key1, AerospikeString(row.key1)),
          AerospikeBinPut("key2", AerospikeString(row.key2)),
          AerospikeBinPut("key3", AerospikeString(row.key3)),
        )
        val aerospikeKey = buildBodyKey(row)
        AerospikeRecord(aerospikeKey, ops, DenyOverrideWritePolicy)

My original plan is df, I will partition by key first, and then batch them, and make a batch write operation:

aerospikeClient.operate(writePolicy, batchRecords.asJava)

But with Aearospike Spark Connector, how can I implement this do you know? From the link you gave to me I only see it will use .write function. Can I still use the original batch write function?

#Write the Sample Data to Aerospike
inputDF \
.write \
.mode('overwrite') \
.format("aerospike")  \
.option("aerospike.writeset", "py_input_data")\
.option("aerospike.updateByKey", "id") \
.save()

My interpretation is that ops is the list of Bins we wish to write to the database, and AerospikeRecord is a structure that holds information regarding the Aerospike Key and the Bins, providing sufficient information to write or update records.

When partitioning the data based on the key, there are some considerations to keep in mind:

  • It will force the data to be shuffled between Spark nodes just to write them, which may not achieve anything profound. Is there any other application-specific reason for partitioning records before writing them?
  • Partitioning, grouping, etc., are prone to partition imbalance, where a few partitions may be too big and others pretty small. These operations are generally prone to Out-Of-Memory errors (OOM). I tend to think of partitioning and grouping as tools to bring similar data together, allowing us to extract insights from them.

A barebone implementation might look something like this:

bodyRecordsDS.forEachPartition { it =>
  val buffer = mutable.ArrayBuffer[AerospikeRecord]()
  while (it.hasNext()) {
    // Convert Spark row to Aerospike-amenable format 
    val record = AerospikeRecord(it.next()) 
    buffer.append(record)
    if (buffer.size == MY_BATCH_SIZE) {
      // If it is an async API, something may fail, so create some way to keep track of failure, completion, retry, etc., in a callback
      client.operate(...) 
      buffer.clear() 
    }
  }
  // Write again the leftovers in the buffer
  client.operate(...) 
}

The Spark connector internally uses batching (default size = 10k, docs, aerospike.write.batchsize ) with the Java client async batch write API. So, I think there is nothing that needs to be done besides setting parameters in sync with the workload.

My batch operation is a sync call, can I still try catch and do retries? I’m also curious about the error messages and status code earospike would give back to me.

Thanks Rahul. I tried to fix my code, just wondering:

  1. foreachPartition I saw this is from spark_sql, we I already have. So we don’t need to use the aerospike spark connector right?
  2. Or in both spark_sql and aerospike spark connector, they all have function foreachPartition and currently I’m still using the spark_sql one.
  • Sync API : When using synchronous APIs, you have the flexibility to retry based on the outcome. It’s important to note that the client may encounter various types of errors. Application developers need to anticipate these errors and plan their handling strategy according to the context.

  • foreachPartition Usage: Yes, you can directly use foreachPartition to write data without connector. The Dataframe/Dataset foreachPartition is Spark construct and is available irrespective of connector’s presence.