I have a job that inserts all records into aerospike, with RecordExistsAction set to update. I was expecting new records inserted with current LUT and old records updated and reset the LUT to current. However, when I run this on same set of records, all the records should get updated with current LUT. I have a further action to delete records where LUT is less than current - 1 min. For my test use case where I am running this on same seton records, there should be nothing to delete. However, all the records get re-inserted and all old records get deleted. Which means, when the record is updated, it is not updating the LUT. Do I need to do something specific for this to work?
//code
final val WRITE_POLICY = { val wp = new WritePolicy() wp.recordExistsAction = RecordExistsAction.UPDATE wp.commitLevel = CommitLevel.COMMIT_MASTER wp.sendKey = true wp }
val storeClient = new AerospikeClient(host, port)
rows.foreach { row =>
storeClient.put( WRITE_POLICY, new Key(NAMESPACE, SET_NAME, row.id), new Bin(SET_NAME, row.toByteArray) ) storeClient.close()
// delete older records val latestTimestamp = DateTime.now(). minusMinutes(1).getMillis * 1000000L
val policy = new QueryPolicy()
policy.filterExp = Exp.build(Exp.lt(Exp.lastUpdate(), Exp.val
(latestTimestamp)))
val rs = storeClient.query(policy, statement)
while (rs.next()) { var key = rs.getKey storeClient.delete(WRITE_POLICY, new Key(key.namespace, key.setName, key.userKey)) } rs.close()