Hello,
We got trouble running Statements in java. Currently were testing Aero with 1 namespace. We inserted a lot of JSONs (AeroSpikeDB.priceDataSets[domainID]) to several sets (only one Bin (“data”)). Now we would like to run a MapReduce job over all JSONs in the Database to producte an average of one of our datasets. Heres our Testing class (DBClient is just a Wrapper about AerospikeClient, hes creating a Aerospike client for each domainID).
public class Testing {
private static final DBClient client = DBClient.getInstance();
class AVG
{
long count;
long avg;
};
private static final Map<Byte,AVG> queryCounts = Collections.synchronizedMap(new HashMap<>(CONFIG.domainsToCrawlArraySize));
void generateAVG(byte domainID)
{
Statement stmnt = new Statement();
stmnt.setNamespace("testing");
stmnt.setSetName(AeroSpikeDB.priceDataSets[domainID]);
stmnt.setBinNames("data");
AVG avg = new AVG();
try (RecordSet rs = client.query(domainID,stmnt)){
avg.count=0;
BigDecimal bd = new BigDecimal(0);
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
String jsonString = (String)record.bins.get("data");
JSONObject jo = new JSONObject(jsonString);
if(jo.has("lastupdate")) {
long lastupdate = jo.getLong("lastupdate");
bd = bd.add(new BigDecimal(lastupdate));
avg.count++;
if(avg.count%1000 == 0)
{
System.out.println(domainID + " -> " + avg.count);
}
}
else
{
System.out.println("Failed!");
}
}
bd = bd.divide(new BigDecimal(avg.count), 0, RoundingMode.HALF_UP);
avg.avg = Long.valueOf(bd.toString());
queryCounts.put(domainID,avg);
} catch (JSONException e) {
ErrorLogger.logException(e);
}
}
public static void main(String args[])
{
Testing t = new Testing();
Stream.of((byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, (byte) 6, (byte) 7, (byte) 8, (byte) 9, (byte) 12).parallel().forEach(domainID -> {
t.generateAVG(domainID);
}
);
System.out.println(queryCounts);
}
Heres the DBClient Wrapper Class:
public class DBClient {
private static DBClient instance = null;
private static AerospikeClient aeroclient[] = null;
private static ClientPolicy cPolicy = null;
private static BatchPolicy bPolicy = null;
private static WritePolicy wPolicy = null;
private static QueryPolicy qPolicy = null;
static
{
qPolicy = new QueryPolicy();
qPolicy.consistencyLevel = ConsistencyLevel.CONSISTENCY_ONE;
wPolicy = new WritePolicy();
wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
wPolicy.commitLevel = CommitLevel.COMMIT_MASTER;
cPolicy = new ClientPolicy();
cPolicy.timeout = 500;
cPolicy.writePolicyDefault = wPolicy;
bPolicy = new BatchPolicy();
bPolicy.timeout = 500;
aeroclient = new AerospikeClient[12];
for (byte i = 0; i < 12; i++) {
aeroclient[i] = new AerospikeClient(cPolicy, "192.168.0.5", 3000);
}
instance = new DBClient();
}
public static DBClient getInstance() {
return instance;
}
public JSONObject get(byte domainID,Key key) throws DBException {
while(true) {
try {
Record r = aeroclient[domainID].get(bPolicy, key);
if (r == null)
return null;
String o = (String) r.getValue("data");
if (o == null)
return null;
else
return new JSONObject(o);
}
catch (Exception e)
{
ErrorLogger.logException(e,key.toString());
throw new DBException();
}
}
}
public void put(byte domainID,Key akey, JSONObject jo) throws DBException {
while(true) {
try {
String data = jo.toString();
Bin b = new Bin("data",data);
aeroclient[domainID].put(wPolicy, akey, b);
return;
}
catch (Exception e)
{
ErrorLogger.logException(e);
throw new DBException();
}
}
}
public RecordSet query(byte domainID,Statement stmnt)
{
return aeroclient[domainID].query(qPolicy, stmnt);
} };
We running on AS 3.5.3 Enterprise and our Java Library is: 3.0.35 (Maven).
The Library is crashing if we run in a Multithreaded enviroment heres our Stacktrace:
Exception in thread “main” com.aerospike.client.AerospikeException: Error Code 1: Server error at com.aerospike.client.query.QueryRecordCommand.parseRecordResults(QueryRecordCommand.java:69) at com.aerospike.client.command.MultiCommand.parseResult(MultiCommand.java:59) at com.aerospike.client.command.SyncCommand.execute(SyncCommand.java:56) at com.aerospike.client.query.QueryExecutor$QueryThread.run(QueryExecutor.java:137) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
ServerLog ist staying Empty. Could you please help us out?
Thank You!