In my production environment we are using Aerospike server version 3.15.0.3 and the client version 4.0.8.
Our aerospike java client calling client.put function using CREATE_ONLY policy in multi threaded environment.Some time we are getting KEY_EXISTS_ERROR even though there is no key.
In the below code LINE 4 print -1 which is key is not there. In LINE 5 call put function with CREATE_ONLY that throws error KEY_ALREADY_EXIST
Why this is happened? Is there any issue in AS server CREATE_ONLY policy or Aeropspike java client have issue?
Added code below.
public AerospikeErrorType PutOnce(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
if (key != null) {
try {
//LINE 4
log.info("<" + reqid + "> KY="+key+" BEFORE AERO CALL Get["+get(key,nameSpace,tableName,reqid)+"]");
//LINE 5
FirstInsert(key, nameSpace, tableName, expiry, counter);
//LINE 6
log.info("<" + reqid + "> KY="+key+" : "+AerospikeErrorType.RECORD_NOT_EXISTS);
return AerospikeErrorType.RECORD_NOT_EXISTS;
} catch (AerospikeException e) {
if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
log.info("<" + reqid + "> KY="+key+" putOnce status2: "+AerospikeErrorType.RECORD_EXISTS +" Error: "+e.getMessage() +" Get["+get(key,nameSpace,tableName,reqid)+"]");
return AerospikeErrorType.RECORD_EXISTS;
} else {
log.info("<" + reqid + "> Error in aerospike operation", e);
e.printStackTrace();
log.info("<" + reqid + "> KY="+key+" putOnce status3: "+AerospikeErrorType.UNKNOWN_STATUS);
return AerospikeErrorType.UNKNOWN_STATUS;
}
} catch (Exception ex) {
ex.printStackTrace();
log.info("<" + reqid + "> KY="+key+" putOnce status4: "+AerospikeErrorType.UNKNOWN_STATUS);
log.info("<" + reqid + "> Error in aerospike operation", ex);
return AerospikeErrorType.UNKNOWN_STATUS;
}
}
log.info("<" + reqid + "> KY="+key+" putOnce status5: "+AerospikeErrorType.UNKNOWN_STATUS);
return AerospikeErrorType.UNKNOWN_STATUS;
}`
public void FirstInsert(String key, String nameSpace, String tableName, int expiry, int counter) {
if (key != null) {
Bin bin = null;
Key asKey = null;
WritePolicy WRPOLICY = new WritePolicy();
WRPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
WRPOLICY.expiration = expiry;
asKey = new Key(nameSpace, tableName, key);
bin = new Bin(null, counter);
client.put(WRPOLICY, asKey, bin);
}
}
========================================================================== This is my AS java client which act as singleton pattern.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.abcd.delivery.helper;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Host;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.abc.delivery.enums.AerospikeErrorType;
import com.abc.delivery.util.Constants;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
public class AerospikeDMClient {
AerospikeClient client = null;
public static ClientPolicy clientPolicy = new ClientPolicy();
public BatchPolicy RPOLICY = null;
public WritePolicy WPOLICY = null;
public WritePolicy WFEQPOLICY = null;
private static Logger log = Logger.getLogger(AerospikeDMClient.class);
private AerospikeDMClient(String aeroSpikeHost, String aeroSpikePort) {
clientPolicy.maxConnsPerNode = Constants.AEROSPIKE_MAX_CONNECTION;
clientPolicy.maxSocketIdle = Constants.AEROSPIKE_SOCKET_IDLE_CONNECTION;
clientPolicy.timeout = Constants.AEROSPIKE_CONNECTION_TIMEDOUT;
String hostsStr[] = aeroSpikeHost.split(",");
String portStr[] = aeroSpikePort.split(",");
Host[] hosts = new Host[hostsStr.length];
for (int i = 0; i < hosts.length; i++) {
hosts[i] = new Host(hostsStr[i], Integer.parseInt(portStr[i]));
}
client = new AerospikeClient(clientPolicy, hosts);
RPOLICY = new BatchPolicy();
WPOLICY = new WritePolicy();
WFEQPOLICY = new WritePolicy();
WPOLICY.expiration = 30 * 60;
}
public AerospikeClient getClient() {
return client;
}
public ClientPolicy getPolicy() {
return clientPolicy;
}
public List<String> batchGet(List<String> duplist, String nameSpace, String tableName) {
List<String> aerospikeReturnList = new ArrayList();
if (duplist != null) {
try {
RPOLICY = new BatchPolicy();
Key[] aeroKey = new Key[duplist.size()];
for (int i = 0; i < duplist.size(); i++) {
aeroKey[i] = new Key(nameSpace, tableName, duplist.get(i));
}
Record[] aerospikeList = client.get(RPOLICY, aeroKey);
if (aerospikeList != null) {
for (int i = 0; i < aerospikeList.length; i++) {
if (aerospikeList[i] != null) {
//log.info("==???==" + aerospikeList[i].getValue("val") + " ===" +aeroKey[i].userKey + " ===> "+aerospikeList[i].expiration + " ====" + aerospikeList[i].generation);
aerospikeReturnList.add((String) aerospikeList[i].getValue("val"));
} else {
aerospikeReturnList.add(null);
}
}
}
} catch (Exception ex) {
log.info("Error in aerospike batchget ", ex);
}
}
return aerospikeReturnList;
}
public long get(String md5Key, String nameSpace, String tableName, long reqid) {
long result = -1;
Record r = null;
try {
Policy p = new Policy();
Key key1 = new Key(nameSpace, tableName, md5Key);
r = client.get(null, key1);
if (r != null) {
result = (Long) r.getValue("");
}
} catch (AerospikeException ex) {
log.info("AerospikeException in get: " ,ex);
}catch (Exception ex) {
log.info("Exception in get: " ,ex);
}
log.info("<" + reqid + "> IN AGET KY="+md5Key +" RR:"+r + " GET["+result+"]");
return result;
}
public List<String> batchGetWithIndex(List<String> duplist, String nameSpace, String tableName) {
List<String> aerospikeReturnList = new ArrayList();
if (duplist != null) {
RPOLICY = new BatchPolicy();
Key[] aeroKey = new Key[duplist.size()];
for (int i = 0; i < duplist.size(); i++) {
aeroKey[i] = new Key(nameSpace, tableName, duplist.get(i));
}
Record[] aerospikeList = client.get(RPOLICY, aeroKey);
if (aerospikeList != null) {
for (int i = 0; i < aerospikeList.length; i++) {
if (aerospikeList[i] != null) {
aerospikeReturnList.add(String.valueOf(i));
} else {
aerospikeReturnList.add(null);
}
}
}
for (int i = 0; i < duplist.size(); i++) {
log.info("batchGet key: " + duplist.get(i));
}
}
return aerospikeReturnList;
}
public void put(List<String> tmpList, String nameSpace, String tableName) {
if (tmpList != null) {
try {
WPOLICY = new WritePolicy();
WPOLICY.expiration = 30 * 60;
for (int i = 0; i < tmpList.size(); i++) {
Key key = new Key(nameSpace, tableName, tmpList.get(i));
Bin bin = new Bin("val", tmpList.get(i));
client.put(WPOLICY, key, bin);
}
} catch (Exception ex) {
log.info("Error in aerospike put", ex);
}
}
}
public void mapPut(Map<String, Integer> ruleExpiryMap, String nameSpace, String tableName) {
if (ruleExpiryMap != null) {
try {
for (Map.Entry<String, Integer> entry : ruleExpiryMap.entrySet()) {
WFEQPOLICY.expiration = entry.getValue();
Key key = new Key(nameSpace, tableName, entry.getKey());
Bin bin = new Bin("val", entry.getKey());
client.put(WFEQPOLICY, key, bin);
}
} catch (Exception ex) {
log.info("Error in aerospike mapPut", ex);
}
}
}
public void freqMapPut(Map<FrquencyCapperHelper, Integer> ruleExpiryMap, String nameSpace, String tableName) {
try {
if (ruleExpiryMap != null) {
for (Map.Entry<FrquencyCapperHelper, Integer> entry : ruleExpiryMap.entrySet()) {
try {
if (entry.getValue() == 1) {
int exp = entry.getKey().getValidity() * 60;
WFEQPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
WFEQPOLICY.expiration = Math.abs(exp);
} else {
WFEQPOLICY.expiration = -2;
WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
}
Key key = new Key(nameSpace, tableName, entry.getKey().getKey());
Bin bin = new Bin("val", String.valueOf((Integer) entry.getValue()));
client.put(WFEQPOLICY, key, bin);
} catch (AerospikeException ex) {
log.info("Error in aerospike feqMapPut: " + ex);
if (ex.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
WFEQPOLICY.expiration = -2;
WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
Key key = new Key(nameSpace, tableName, entry.getKey().getKey());
Bin bin = new Bin("val", String.valueOf((Integer) entry.getValue()));
client.put(WFEQPOLICY, key, bin);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public AerospikeErrorType PutOnce(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
if (key != null) {
try {
log.info("<" + reqid + "> KY="+key+"BAERO CALL Get["+get(key,nameSpace,tableName,reqid)+"]");
FirstInsert(key, nameSpace, tableName, expiry, counter);
log.info("<" + reqid + "> KY="+key+" putOnce status1: "+AerospikeErrorType.RECORD_NOT_EXISTS);
return AerospikeErrorType.RECORD_NOT_EXISTS;
} catch (AerospikeException e) {
if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
log.info("<" + reqid + "> KY="+key+" putOnce status2: "+AerospikeErrorType.RECORD_EXISTS +" Error: "+e.getMessage() +" Get["+get(key,nameSpace,tableName,reqid)+"]");
return AerospikeErrorType.RECORD_EXISTS;
} else {
log.info("<" + reqid + "> Error in aerospike operation", e);
e.printStackTrace();
log.info("<" + reqid + "> KY="+key+" putOnce status3: "+AerospikeErrorType.UNKNOWN_STATUS);
return AerospikeErrorType.UNKNOWN_STATUS;
}
} catch (Exception ex) {
ex.printStackTrace();
log.info("<" + reqid + "> KY="+key+" putOnce status4: "+AerospikeErrorType.UNKNOWN_STATUS);
log.info("<" + reqid + "> Error in aerospike operation", ex);
return AerospikeErrorType.UNKNOWN_STATUS;
}
}
log.info("<" + reqid + "> KY="+key+" putOnce status5: "+AerospikeErrorType.UNKNOWN_STATUS);
return AerospikeErrorType.UNKNOWN_STATUS;
}
public void FirstInsert(String key, String nameSpace, String tableName, int expiry, int counter) {
if (key != null) {
Bin bin = null;
Key asKey = null;
WritePolicy WRPOLICY = new WritePolicy();
WRPOLICY.recordExistsAction = RecordExistsAction.CREATE_ONLY;
WRPOLICY.expiration = expiry;
asKey = new Key(nameSpace, tableName, key);
bin = new Bin(null, counter);
client.put(WRPOLICY, asKey, bin);
}
}
public Integer IncrAndGet(String key, String nameSpace, String tableName, int expiry, int counter, long reqid) {
if (key != null) {
Bin bin = null;
Key asKey = null;
try {
FirstInsert(key, nameSpace, tableName, expiry, counter);
return 1;
} catch (AerospikeException e) {
if (e.getResultCode() == ResultCode.KEY_EXISTS_ERROR) {
WFEQPOLICY.expiration = -2;
WFEQPOLICY.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
asKey = new Key(nameSpace, tableName, key);
bin = new Bin(null, counter);
try {
return client.operate(WFEQPOLICY, asKey, Operation.add(bin), Operation.get()).getInt("");
} catch (AerospikeException e1) {
if (e1.getResultCode() == ResultCode.KEY_NOT_FOUND_ERROR) {
FirstInsert(key, nameSpace, tableName, expiry, counter);
return 1;
}
}
} else {
log.info("<" + reqid + "> Error in aerospike operation", e);
e.printStackTrace();
return 0;
}
} catch (Exception ex) {
ex.printStackTrace();
log.info("<" + reqid + "> Error in aerospike operation", ex);
return 0;
}
}
return 0;
}
}