AeroSpike with create_only policy throws record_already_exists error even-though there is no record


#1

In my production environment we are using Aerospike server version 3.15.0.3 and the client version 4.0.8.

Our aerospike java client calling client.put function using CREATE_ONLY policy in multi threaded environment.Some time we are getting KEY_EXISTS_ERROR even though there is no key.

In the below code LINE 4 print -1 which is key is not there. In LINE 5 call put function with CREATE_ONLY that throws error KEY_ALREADY_EXIST

Why this is happened? Is there any issue in AS server CREATE_ONLY policy or Aeropspike java client have issue?

Added code below.

public AerospikeErrorType PutOnce(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
    if (key != null) {
        try {
            //LINE 4
            log.info("<" + reqid + "> KY="+key+" BEFORE AERO CALL Get["+get(key,nameSpace,tableName,reqid)+"]");
            //LINE 5
            FirstInsert(key, nameSpace, tableName, expiry, counter);
            //LINE 6
            log.info("<" + reqid + ">  KY="+key+" : "+AerospikeErrorType.RECORD_NOT_EXISTS);
            return AerospikeErrorType.RECORD_NOT_EXISTS;

        } catch (AerospikeException e) {
            if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
                log.info("<" + reqid + "> KY="+key+" putOnce status2: "+AerospikeErrorType.RECORD_EXISTS +" Error: "+e.getMessage() +" Get["+get(key,nameSpace,tableName,reqid)+"]");
                return AerospikeErrorType.RECORD_EXISTS;
            } else {
                log.info("<" + reqid + "> Error in aerospike operation", e);
                e.printStackTrace();
                log.info("<" + reqid + ">  KY="+key+" putOnce status3: "+AerospikeErrorType.UNKNOWN_STATUS);
                return AerospikeErrorType.UNKNOWN_STATUS;
            }

        } catch (Exception ex) {
            ex.printStackTrace();
            log.info("<" + reqid + ">  KY="+key+" putOnce status4: "+AerospikeErrorType.UNKNOWN_STATUS);
            log.info("<" + reqid + "> Error in aerospike operation", ex);
            return AerospikeErrorType.UNKNOWN_STATUS;
        }
    }
    log.info("<" + reqid + ">  KY="+key+" putOnce status5: "+AerospikeErrorType.UNKNOWN_STATUS);
    return AerospikeErrorType.UNKNOWN_STATUS;
}`


public void FirstInsert(String key, String nameSpace, String tableName, int expiry, int counter) {
    if (key != null) {
        Bin bin = null;
        Key asKey = null;

        WritePolicy WRPOLICY = new WritePolicy();
        WRPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
        WRPOLICY.expiration = expiry;
        asKey = new Key(nameSpace, tableName, key);
        bin = new Bin(null, counter);
        client.put(WRPOLICY, asKey, bin);
    }
}

========================================================================== This is my AS java client which act as singleton pattern.

  /*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.abcd.delivery.helper;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Host;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.abc.delivery.enums.AerospikeErrorType;
import com.abc.delivery.util.Constants;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;


public class AerospikeDMClient {

    AerospikeClient client = null;
    public static ClientPolicy clientPolicy = new ClientPolicy();
    public BatchPolicy RPOLICY = null;
    public WritePolicy WPOLICY = null;
    public WritePolicy WFEQPOLICY = null;
    private static Logger log = Logger.getLogger(AerospikeDMClient.class);

    private AerospikeDMClient(String aeroSpikeHost, String aeroSpikePort) {

        clientPolicy.maxConnsPerNode = Constants.AEROSPIKE_MAX_CONNECTION;
        clientPolicy.maxSocketIdle = Constants.AEROSPIKE_SOCKET_IDLE_CONNECTION;
        clientPolicy.timeout = Constants.AEROSPIKE_CONNECTION_TIMEDOUT;

        String hostsStr[] = aeroSpikeHost.split(",");
        String portStr[] = aeroSpikePort.split(",");
        Host[] hosts = new Host[hostsStr.length];
        for (int i = 0; i < hosts.length; i++) {
            hosts[i] = new Host(hostsStr[i], Integer.parseInt(portStr[i]));
        }

        client = new AerospikeClient(clientPolicy, hosts);
        RPOLICY = new BatchPolicy();
        WPOLICY = new WritePolicy();
        WFEQPOLICY = new WritePolicy();
        WPOLICY.expiration = 30 * 60;

    }

    public AerospikeClient getClient() {
        return client;
    }

    public ClientPolicy getPolicy() {
        return clientPolicy;
    }

    public List<String> batchGet(List<String> duplist, String nameSpace, String tableName) {
        List<String> aerospikeReturnList = new ArrayList();
        if (duplist != null) {
            try {
                RPOLICY = new BatchPolicy();
                Key[] aeroKey = new Key[duplist.size()];
                for (int i = 0; i < duplist.size(); i++) {
                    aeroKey[i] = new Key(nameSpace, tableName, duplist.get(i));

                }
                Record[] aerospikeList = client.get(RPOLICY, aeroKey);

                if (aerospikeList != null) {

                    for (int i = 0; i < aerospikeList.length; i++) {

                        if (aerospikeList[i] != null) {

                            //log.info("==???==" + aerospikeList[i].getValue("val") + " ===" +aeroKey[i].userKey  + " ===> "+aerospikeList[i].expiration + " ====" + aerospikeList[i].generation);
                            aerospikeReturnList.add((String) aerospikeList[i].getValue("val"));
                        } else {
                            aerospikeReturnList.add(null);
                        }

                    }
                }
            } catch (Exception ex) {
                log.info("Error in aerospike batchget ", ex);
            }

        }
        return aerospikeReturnList;
    }

    public long get(String md5Key, String nameSpace, String tableName, long reqid) {
        long result = -1;
         Record r = null;
        try {
            Policy p = new Policy();
            Key key1 = new Key(nameSpace, tableName, md5Key);
            r = client.get(null, key1);
            if (r != null) {
                result = (Long) r.getValue("");
               
            }
        } catch (AerospikeException ex) {
            log.info("AerospikeException in get: " ,ex);
        }catch (Exception ex) {
            log.info("Exception in get: " ,ex);
        }
        log.info("<" + reqid + "> IN AGET KY="+md5Key +" RR:"+r + " GET["+result+"]");
        return result;
    }
    
    public List<String> batchGetWithIndex(List<String> duplist, String nameSpace, String tableName) {
        List<String> aerospikeReturnList = new ArrayList();
        if (duplist != null) {
            RPOLICY = new BatchPolicy();
            Key[] aeroKey = new Key[duplist.size()];
            for (int i = 0; i < duplist.size(); i++) {
                aeroKey[i] = new Key(nameSpace, tableName, duplist.get(i));

            }
            Record[] aerospikeList = client.get(RPOLICY, aeroKey);

            if (aerospikeList != null) {

                for (int i = 0; i < aerospikeList.length; i++) {

                    if (aerospikeList[i] != null) {

                        aerospikeReturnList.add(String.valueOf(i));
                    } else {
                        aerospikeReturnList.add(null);
                    }

                }
            }
            for (int i = 0; i < duplist.size(); i++) {
                log.info("batchGet key: " + duplist.get(i));
            }
        }
        return aerospikeReturnList;
    }

    public void put(List<String> tmpList, String nameSpace, String tableName) {
        if (tmpList != null) {
            try {
                WPOLICY = new WritePolicy();
                WPOLICY.expiration = 30 * 60;
                for (int i = 0; i < tmpList.size(); i++) {
                    Key key = new Key(nameSpace, tableName, tmpList.get(i));
                    Bin bin = new Bin("val", tmpList.get(i));
                    client.put(WPOLICY, key, bin);

                }
            } catch (Exception ex) {
                log.info("Error in aerospike put", ex);
            }
        }
    }

    public void mapPut(Map<String, Integer> ruleExpiryMap, String nameSpace, String tableName) {
        if (ruleExpiryMap != null) {
            try {
                for (Map.Entry<String, Integer> entry : ruleExpiryMap.entrySet()) {
                    WFEQPOLICY.expiration = entry.getValue();
                    Key key = new Key(nameSpace, tableName, entry.getKey());
                    Bin bin = new Bin("val", entry.getKey());
                    client.put(WFEQPOLICY, key, bin);
                }
            } catch (Exception ex) {
                log.info("Error in aerospike mapPut", ex);
            }
        }
    }

    public void freqMapPut(Map<FrquencyCapperHelper, Integer> ruleExpiryMap, String nameSpace, String tableName) {
        try {
            if (ruleExpiryMap != null) {

                for (Map.Entry<FrquencyCapperHelper, Integer> entry : ruleExpiryMap.entrySet()) {
                    try {
                        if (entry.getValue() == 1) {
                            int exp = entry.getKey().getValidity() * 60;
                            WFEQPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
                            WFEQPOLICY.expiration = Math.abs(exp);
                        } else {
                            WFEQPOLICY.expiration = -2;
                            WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
                        }

                        Key key = new Key(nameSpace, tableName, entry.getKey().getKey());
                        Bin bin = new Bin("val", String.valueOf((Integer) entry.getValue()));
                        client.put(WFEQPOLICY, key, bin);
                    } catch (AerospikeException ex) {
                        log.info("Error in aerospike feqMapPut: " + ex);
                        if (ex.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
                            WFEQPOLICY.expiration = -2;
                            WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
                            Key key = new Key(nameSpace, tableName, entry.getKey().getKey());
                            Bin bin = new Bin("val", String.valueOf((Integer) entry.getValue()));
                            client.put(WFEQPOLICY, key, bin);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public AerospikeErrorType PutOnce(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
        if (key != null) {
            try {
                log.info("<" + reqid + "> KY="+key+"BAERO CALL Get["+get(key,nameSpace,tableName,reqid)+"]");
                FirstInsert(key, nameSpace, tableName, expiry, counter);
                log.info("<" + reqid + ">  KY="+key+" putOnce status1: "+AerospikeErrorType.RECORD_NOT_EXISTS);
                return AerospikeErrorType.RECORD_NOT_EXISTS;

            } catch (AerospikeException e) {
                if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
                    log.info("<" + reqid + "> KY="+key+" putOnce status2: "+AerospikeErrorType.RECORD_EXISTS +" Error: "+e.getMessage() +" Get["+get(key,nameSpace,tableName,reqid)+"]");
                    return AerospikeErrorType.RECORD_EXISTS;
                } else {
                    log.info("<" + reqid + "> Error in aerospike operation", e);
                    e.printStackTrace();
                    log.info("<" + reqid + ">  KY="+key+" putOnce status3: "+AerospikeErrorType.UNKNOWN_STATUS);
                    return AerospikeErrorType.UNKNOWN_STATUS;
                }

            } catch (Exception ex) {
                ex.printStackTrace();
                log.info("<" + reqid + ">  KY="+key+" putOnce status4: "+AerospikeErrorType.UNKNOWN_STATUS);
                log.info("<" + reqid + "> Error in aerospike operation", ex);
                return AerospikeErrorType.UNKNOWN_STATUS;
            }
        }
        log.info("<" + reqid + ">  KY="+key+" putOnce status5: "+AerospikeErrorType.UNKNOWN_STATUS);
        return AerospikeErrorType.UNKNOWN_STATUS;
    }

    public void FirstInsert(String key, String nameSpace, String tableName, int expiry, int counter) {
        if (key != null) {
            Bin bin = null;
            Key asKey = null;

            WritePolicy WRPOLICY = new WritePolicy();
            WRPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
            WRPOLICY.expiration = expiry;
            asKey = new Key(nameSpace, tableName, key);
            bin = new Bin(null, counter);
            client.put(WRPOLICY, asKey, bin);
        }
    }

    public Integer IncrAndGet(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
        if (key != null) {
            Bin bin = null;
            Key asKey = null;
            try {
                FirstInsert(key, nameSpace, tableName, expiry, counter);
                return 1;

            } catch (AerospikeException e) {
                if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
                    WFEQPOLICY.expiration = -2;
                    WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
                    asKey = new Key(nameSpace, tableName, key);
                    bin = new Bin(null, counter);
                    try {
                        return client.operate(WFEQPOLICY, asKey, Operation.add(bin), Operation.get()).getInt("");
                    } catch (AerospikeException e1) {
                        if (e1.getResultCode() == ResultCode.KEY_NOT_FOUND_ERROR) {
                            FirstInsert(key, nameSpace, tableName, expiry, counter);
                            return 1;
                        }
                    }
                } else {
                    log.info("<" + reqid + "> Error in aerospike operation", e);
                    e.printStackTrace();
                    return 0;
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                log.info("<" + reqid + "> Error in aerospike operation", ex);
                return 0;
            }
        }
        return 0;
    }

}