Record LUT query for each bin

Hey All, Until now i was using Kafka connect in order to persist data from kafka to Aerospike, and now we want to create a recovery tool which will get the current LUT for each records on a specific set and on trigger will persist all the messages on the relevant kafka topic to AS and will delete all the records on which the current LUT <= now, that’s promise that all the not relevant data will be deleted. I was using the following LUT:

aerospikeService.registerUDFs(
        """
          |function getLUT(r)
          |    return record.last_update_time(r)
          |end
          |""".stripMargin
      )

and

 def registerUDFs(code: String): Unit = {
    logger.info(s"[AerospikeService] - registerUDFs($code) Triggered")
    for (client <- clients) {
      client
        .registerUdfString(null, code, Configuration.packageName + ".lua", Language.LUA)
        .waitTillComplete(100, config.timeout.toMillis.toInt)
    }
  }

and calculation:

def calculateCurrentLUTs(): Seq[Long] = {
    logger.info("[AerospikeService] - calculateCurrentLUTs() Triggered")
    val policy = new WritePolicy()
    policy.setTimeout(config.timeout.toMillis.toInt)
    val key = new Key(config.sets.head.namespace, Configuration.dummySetName, 1)
    for (client <- clients) yield {
      client.put(policy, key, new Bin("a", 1))
      client.execute(policy, key, Configuration.packageName, "getLUT").asInstanceOf[Long]
    }
  }

The problem now is that the KafkaConnect not longer persists in one bin but can persist 2 bins, and therefore the LUT which is recorded for each row will be changed although 1 bin can be corrupted. this is the truncate method:

 def truncate(startTimes: Seq[Long], durableDelete: Boolean): Unit = {
    logger.info(s"[AerospikeService] - truncate($startTimes) Triggered")
    for ((client, startTime) <- clients.zip(startTimes)) {
      val policy = new WritePolicy()
      policy.durableDelete = durableDelete
      val calendar = Calendar.getInstance()
      calendar.setTimeInMillis(startTime + 1262304000000L) // uses CITRUSLEAF_EPOCH  - see https://discuss.aerospike.com/t/how-to-use-view-and-calulate-last-update-time-lut-for-the-truncate-command/4330

      for ((namespace, sets) <- config.sets.groupBy(_.namespace)) {
        policy.filterExp = Exp.build(
          Exp.and(
            Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
            if (sets.size == 1) {
              Exp.eq(Exp.setName(), Exp.`val`(sets.head.name))
            } else {
              Exp.or(sets.map(x => Exp.eq(Exp.setName(), Exp.`val`(x.name))): _*)
            }))

        val statement = new Statement
        statement.setNamespace(namespace)
        client.execute(policy, statement, Operation.delete())
      }
    }
  }

Is it possible to record LUT for each bin instead of each row?

Unfortunately, at this point, bin level LUTs are not available to be checked against as far as I know.

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.