Record LUT query for each bin

Hey All, Until now i was using Kafka connect in order to persist data from kafka to Aerospike, and now we want to create a recovery tool which will get the current LUT for each records on a specific set and on trigger will persist all the messages on the relevant kafka topic to AS and will delete all the records on which the current LUT <= now, that’s promise that all the not relevant data will be deleted. I was using the following LUT:

          |function getLUT(r)
          |    return record.last_update_time(r)


 def registerUDFs(code: String): Unit = {"[AerospikeService] - registerUDFs($code) Triggered")
    for (client <- clients) {
        .registerUdfString(null, code, Configuration.packageName + ".lua", Language.LUA)
        .waitTillComplete(100, config.timeout.toMillis.toInt)

and calculation:

def calculateCurrentLUTs(): Seq[Long] = {"[AerospikeService] - calculateCurrentLUTs() Triggered")
    val policy = new WritePolicy()
    val key = new Key(config.sets.head.namespace, Configuration.dummySetName, 1)
    for (client <- clients) yield {
      client.put(policy, key, new Bin("a", 1))
      client.execute(policy, key, Configuration.packageName, "getLUT").asInstanceOf[Long]

The problem now is that the KafkaConnect not longer persists in one bin but can persist 2 bins, and therefore the LUT which is recorded for each row will be changed although 1 bin can be corrupted. this is the truncate method:

 def truncate(startTimes: Seq[Long], durableDelete: Boolean): Unit = {"[AerospikeService] - truncate($startTimes) Triggered")
    for ((client, startTime) <- {
      val policy = new WritePolicy()
      policy.durableDelete = durableDelete
      val calendar = Calendar.getInstance()
      calendar.setTimeInMillis(startTime + 1262304000000L) // uses CITRUSLEAF_EPOCH  - see

      for ((namespace, sets) <- config.sets.groupBy(_.namespace)) {
        policy.filterExp =
            Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
            if (sets.size == 1) {
              Exp.eq(Exp.setName(), Exp.`val`(
            } else {
              Exp.or( => Exp.eq(Exp.setName(), Exp.`val`( _*)

        val statement = new Statement
        client.execute(policy, statement, Operation.delete())

Is it possible to record LUT for each bin instead of each row?

Unfortunately, at this point, bin level LUTs are not available to be checked against as far as I know.

