Distributed Queue and Transactions


#1

Hi,

I’m writing a Java application that requires a distributed queue. Getting the next item in the queue needs to be atomic so that multiple clients don’t get the same item. It appears that writing a UDF in Lua might be the solution. In my function I can read the first record, delete it and return the value. Does this sound like it will work? Are Java calls to Lua server functions atomic? Thanks!


#2

We do have examples of how to do something along these lines. This does not require a UDF, but can be done from the client side. Please note that this code does not cover how to write the data initially, which we assume is handled by a separate application.

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;

public class PromoCodeTest {   
   private static final String Namespace = "test";
   private static final String SetName = "testset";
   private static final String PromoCode = "promo";
   private static final String Used = "used";
   
   public static void main(String[] args) {
      try {
         PromoCodeTest test = new PromoCodeTest();
         test.runTest(args[0]);
      }
      catch (Exception e) {
         e.printStackTrace();
      }
   }

   // AsyncClient could also be used because it's a superset of AerospikeClient.
   private AerospikeClient client;
   
   public void runTest(String host) throws AerospikeException {
      client = new AerospikeClient(host, 3000);
      
      try {
         String promoCode = getPromoCode();
         System.out.println("Promo code: " + promoCode);
      }
      finally {
         client.close();
      }
   }

   private String getPromoCode() throws AerospikeException {
       ScanPolicy policy = new ScanPolicy();
       ScanHandler handler = new ScanHandler();       
       Node[] nodes = client.getNodes();
      
      for (Node node : nodes) {      
         try {
            client.scanNode(policy, node.getName(), Namespace, SetName, handler);
         }
         catch (AerospikeException.ScanTerminated st) {
            // Scan termination is expected.
         }
         
         if (handler.promoCode != null) {
            return handler.promoCode;
         }
      }
      return null;      
   }
   
   private class ScanHandler implements ScanCallback {
      private String promoCode;
      
      @Override
      public void scanCallback(Key key, Record record) throws AerospikeException {
         String code = (String)record.getValue(PromoCode);
         Object object = record.getValue(Used);
         
         if (object == null || ((int)(Integer)object) == 0) {
            // Write used only if generation is the same as from scan.
            WritePolicy policy = new WritePolicy();
            policy.recordExistsAction = RecordExistsAction.EXPECT_GEN_EQUAL;
            policy.generation = record.generation;
            
            try {
               client.put(policy, key, new Bin(Used, 1));
               // The record could be copied to another set and deleted from the original set.
               // This step is not required, but it would reduce the number scan records
               // that need to be processed.
   
               // We have promo code. Terminate scan.
               promoCode = code;            
               throw new AerospikeException.ScanTerminated();
            }
            catch (AerospikeException ae) {
               // Let scan continue with next record on generation error.                  
               if (ae.getResultCode() != ResultCode.GENERATION_ERROR) {
                  throw ae;
               }
            }
         }
      }      
   }
}