Hey All, Until now i was using Kafka connect in order to persist data from kafka to Aerospike, and now we want to create a recovery tool which will get the current LUT for each records on a specific set and on trigger will persist all the messages on the relevant kafka topic to AS and will delete all the records on which the current LUT <= now, that’s promise that all the not relevant data will be deleted. I was using the following LUT:
aerospikeService.registerUDFs(
"""
|function getLUT(r)
| return record.last_update_time(r)
|end
|""".stripMargin
)
and
def registerUDFs(code: String): Unit = {
logger.info(s"[AerospikeService] - registerUDFs($code) Triggered")
for (client <- clients) {
client
.registerUdfString(null, code, Configuration.packageName + ".lua", Language.LUA)
.waitTillComplete(100, config.timeout.toMillis.toInt)
}
}
and calculation:
def calculateCurrentLUTs(): Seq[Long] = {
logger.info("[AerospikeService] - calculateCurrentLUTs() Triggered")
val policy = new WritePolicy()
policy.setTimeout(config.timeout.toMillis.toInt)
val key = new Key(config.sets.head.namespace, Configuration.dummySetName, 1)
for (client <- clients) yield {
client.put(policy, key, new Bin("a", 1))
client.execute(policy, key, Configuration.packageName, "getLUT").asInstanceOf[Long]
}
}
The problem now is that the KafkaConnect not longer persists in one bin
but can persist 2 bins, and therefore the LUT which is recorded for each row will be changed although 1 bin can be corrupted.
this is the truncate method:
def truncate(startTimes: Seq[Long], durableDelete: Boolean): Unit = {
logger.info(s"[AerospikeService] - truncate($startTimes) Triggered")
for ((client, startTime) <- clients.zip(startTimes)) {
val policy = new WritePolicy()
policy.durableDelete = durableDelete
val calendar = Calendar.getInstance()
calendar.setTimeInMillis(startTime + 1262304000000L) // uses CITRUSLEAF_EPOCH - see https://discuss.aerospike.com/t/how-to-use-view-and-calulate-last-update-time-lut-for-the-truncate-command/4330
for ((namespace, sets) <- config.sets.groupBy(_.namespace)) {
policy.filterExp = Exp.build(
Exp.and(
Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
if (sets.size == 1) {
Exp.eq(Exp.setName(), Exp.`val`(sets.head.name))
} else {
Exp.or(sets.map(x => Exp.eq(Exp.setName(), Exp.`val`(x.name))): _*)
}))
val statement = new Statement
statement.setNamespace(namespace)
client.execute(policy, statement, Operation.delete())
}
}
}
Is it possible to record LUT
for each bin instead of each row?