Does operations.add() provide consistency guarantee

We are having a multi threaded application (using java) that is updating value in bin using Operations.add(). We are seeing inconsistency in the results. Also, we are updating the keys in batch update. So there can be multiple threads doing batch update. And each batch update can have several add operation. Attaching a sample code to produce the issue.

Using high availability cluster.

        key = UUID.randomUUID().toString();
        int writeCount = 20;
        int threads = 10;
        val batchOfBatchRecords = new ArrayList<ArrayList<BatchRecord>>();
        for(int i=0;i<threads;i++) {
            val batchRecords = new ArrayList<BatchRecord>();
            batchOfBatchRecords.add(batchRecords);
            for(int j=0;j<writeCount;j++) {
                batchRecords.add(getBatchWriteObject(getOperations()));
            }
        }

        ExecutorService executorService = new ThreadPoolExecutor(20, 20, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
        for(int i=0;i<threads;i++) {
            int finalI = i;
            executorService.execute(() -> aerospikeClient.operate(aerospikeClient.getBatchPolicyDefault(), batchOfBatchRecords.get(finalI)));
        }

        Thread.sleep(10000);

        val record = aerospikeClient.get(new Policy(), new Key("test", "set", key));

        val readCount = (long) record.getValue("count");
        val diff = threads*writeCount - readCount;






/**
 get operations.
*/
private Operation[] getOperations() {
        return new Operation[]{
                Operation.add(new Bin("count", 1))
        };
}

You are creating a “hot key” (KEY_BUSY - Error Code 14: https://aerospike.com/docs/reference/error_codes ) and not catching the hotkey error in your add() operations. So some of your add() operations are not completing.

I have tried to simulate your code in Jupyter Notebook. Here is my code:

String key = UUID.randomUUID().toString();
Operation[] getOperations() {
        return new Operation[]{
                Operation.add(new Bin("count", 1)),
                Operation.get()
        };
}

BatchRecord getBatchWriteObject( Operation[] oList){
    return new BatchWrite(new Key("test", "set", key), oList);    
}

BatchPolicy batchPolicy = new BatchPolicy();

int writeCount = 300;
int threads = 10;
List<List<BatchRecord>> batchOfBatchRecords = new ArrayList<List<BatchRecord>>();
for(int i=0;i<threads;i++) {
     List<BatchRecord> batchRecords = new ArrayList<BatchRecord>();
     batchOfBatchRecords.add(batchRecords);
     for(int j=0;j<writeCount;j++) {
                batchRecords.add(getBatchWriteObject(getOperations()));
     }
}

void runtask (int finalI) {
  //client.operate(batchPolicy, batchOfBatchRecords.get(finalI));
  List<BatchRecord> records = batchOfBatchRecords.get(finalI);
  client.operate(batchPolicy, records);

  for (BatchRecord record: records){
      if(record.resultCode == 0) {
        Record rec = record.record;
        int v1 = rec.getInt("count");
        System.out.println("Count:"+ v1);    
      } 
      else {
          System.out.println("Error: "+ ResultCode.getResultString(record.resultCode));
      }
  }
}

ExecutorService executorService = new ThreadPoolExecutor(20, 20, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
for(int i=0;i<threads;i++) {
    int finalI = i;
    //executorService.execute(() -> client.operate(batchPolicy, batchOfBatchRecords.get(finalI)));
    executorService.execute(() -> runtask(finalI));
}

Thread.sleep(1000);

Record record = client.get(null, new Key("test", "set", key));

long readCount = (long) record.getValue("count");
long diff = threads*writeCount - readCount;

System.out.println("Read Count: " + readCount);
System.out.println("Diff Count: " + diff);

For smaller numbers (writeCount = 200, threads = 10), I get this output - no issue:

Read Count: 2000
Diff Count: 0

[ Note: In this test I was using single node cluster. So, no waiting for replica write. Once the transaction pending limit hits default of 20, server sends KEY_BUSY error. When I changed to a 3 node cluster, I was able to do (writeCount = 5, threads = 10) but got hot key / KEY_BUSY error with (writeCount = 6, threads = 10) ]

For higher numbers, single node server, (writeCount = 300, threads = 10), I start seeing hot key errors.

...
...
Count:215
Count:1574
Error: Hot key
Error: Hot key
Error: Hot key
Count:1575
Error: Hot key
Error: Hot key
Count:66
Count:141
...
...

Final:

Read Count: 2796
Diff Count: 204

You can test not hitting this hot key error by setting transaction-pending-limit in the namespace to 0, dynamically. (My namespace name is test )

asadm
Admin> enable
Admin+> manage config namespace test param transaction-pending-limit to 0

Now the queue will grow to whatever number - as long as you have adequate memory. With adequate thread.sleep() , I was able to run:

Read Count: 10000
Diff Count: 0

Hot key situations should really be avoided using data modeling techniques.

Hi @pgupta. If we set transactions-pending-limit to 0, what are the implications?

Just thinking about possibilities, not really tested this. So, you have a whole bunch of clients, trying to update the same key. If you set transactions-pending-limit to 0, that queue can build up where master is waiting to replicate on the replica, which means in turn the client will be waiting. So either you set clients to never timeout or they will timeout waiting on the response and then they can re-try. If the client side times out, the server side transaction may still eventually complete but will have lost the socket to the client to send the response. So, depending on what you are doing with the hotkey - you may end up with updates that the client does not know for sure that they succeeded.

I would not recommended implementing a hot key data model. There may be other ways to model the problem and avoid the hotkey situation.