Distributed Queue and Transactions

Hi,

I’m writing a Java application that requires a distributed queue. Getting the next item in the queue needs to be atomic so that multiple clients don’t get the same item. It appears that writing a UDF in Lua might be the solution. In my function I can read the first record, delete it and return the value. Does this sound like it will work? Are Java calls to Lua server functions atomic? Thanks!

We do have examples of how to do something along these lines. This does not require a UDF, but can be done from the client side. Please note that this code does not cover how to write the data initially, which we assume is handled by a separate application.

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;

public class PromoCodeTest {   
   private static final String Namespace = "test";
   private static final String SetName = "testset";
   private static final String PromoCode = "promo";
   private static final String Used = "used";
   
   public static void main(String[] args) {
      try {
         PromoCodeTest test = new PromoCodeTest();
         test.runTest(args[0]);
      }
      catch (Exception e) {
         e.printStackTrace();
      }
   }

   // AsyncClient could also be used because it's a superset of AerospikeClient.
   private AerospikeClient client;
   
   public void runTest(String host) throws AerospikeException {
      client = new AerospikeClient(host, 3000);
      
      try {
         String promoCode = getPromoCode();
         System.out.println("Promo code: " + promoCode);
      }
      finally {
         client.close();
      }
   }

   private String getPromoCode() throws AerospikeException {
       ScanPolicy policy = new ScanPolicy();
       ScanHandler handler = new ScanHandler();       
       Node[] nodes = client.getNodes();
      
      for (Node node : nodes) {      
         try {
            client.scanNode(policy, node.getName(), Namespace, SetName, handler);
         }
         catch (AerospikeException.ScanTerminated st) {
            // Scan termination is expected.
         }
         
         if (handler.promoCode != null) {
            return handler.promoCode;
         }
      }
      return null;      
   }
   
   private class ScanHandler implements ScanCallback {
      private String promoCode;
      
      @Override
      public void scanCallback(Key key, Record record) throws AerospikeException {
         String code = (String)record.getValue(PromoCode);
         Object object = record.getValue(Used);
         
         if (object == null || ((int)(Integer)object) == 0) {
            // Write used only if generation is the same as from scan.
            WritePolicy policy = new WritePolicy();
            policy.recordExistsAction = RecordExistsAction.EXPECT_GEN_EQUAL;
            policy.generation = record.generation;
            
            try {
               client.put(policy, key, new Bin(Used, 1));
               // The record could be copied to another set and deleted from the original set.
               // This step is not required, but it would reduce the number scan records
               // that need to be processed.
   
               // We have promo code. Terminate scan.
               promoCode = code;            
               throw new AerospikeException.ScanTerminated();
            }
            catch (AerospikeException ae) {
               // Let scan continue with next record on generation error.                  
               if (ae.getResultCode() != ResultCode.GENERATION_ERROR) {
                  throw ae;
               }
            }
         }
      }      
   }
}