Multithreading Statements

Hello,

We got trouble running Statements in java. Currently were testing Aero with 1 namespace. We inserted a lot of JSONs (AeroSpikeDB.priceDataSets[domainID]) to several sets (only one Bin (“data”)). Now we would like to run a MapReduce job over all JSONs in the Database to producte an average of one of our datasets. Heres our Testing class (DBClient is just a Wrapper about AerospikeClient, hes creating a Aerospike client for each domainID).

public class Testing {
    private static final DBClient client = DBClient.getInstance();
    class AVG
    {
        long count;
        long avg;
    };
    private static final Map<Byte,AVG> queryCounts = Collections.synchronizedMap(new HashMap<>(CONFIG.domainsToCrawlArraySize));

    void generateAVG(byte domainID)
    {
        Statement stmnt = new Statement();
        stmnt.setNamespace("testing");
        stmnt.setSetName(AeroSpikeDB.priceDataSets[domainID]);
        stmnt.setBinNames("data");

        AVG avg = new AVG();
        try (RecordSet rs = client.query(domainID,stmnt)){
            avg.count=0;
            BigDecimal bd = new BigDecimal(0);
            while (rs.next()) {
                Key key = rs.getKey();
                Record record = rs.getRecord();
                String jsonString = (String)record.bins.get("data");
                JSONObject jo = new JSONObject(jsonString);
                if(jo.has("lastupdate")) {
                    long lastupdate = jo.getLong("lastupdate");
                    bd = bd.add(new BigDecimal(lastupdate));
                    avg.count++;
                    if(avg.count%1000 == 0)
                    {
                        System.out.println(domainID + " -> " + avg.count);
                    }
                }
                else
                {
                    System.out.println("Failed!");
                }
            }
            bd = bd.divide(new BigDecimal(avg.count), 0, RoundingMode.HALF_UP);
            avg.avg = Long.valueOf(bd.toString());
            queryCounts.put(domainID,avg);
        } catch (JSONException e) {
            ErrorLogger.logException(e);
        }

    }
    public static void main(String args[])
    {
        Testing t = new Testing();
        Stream.of((byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, (byte) 6, (byte) 7, (byte) 8, (byte) 9, (byte) 12).parallel().forEach(domainID -> {
            t.generateAVG(domainID);
        }
        );

        System.out.println(queryCounts);
    }

Heres the DBClient Wrapper Class:

public  class DBClient {
    private static DBClient instance = null;
    private static AerospikeClient aeroclient[] = null;
    private static ClientPolicy cPolicy = null;
    private static BatchPolicy bPolicy = null;
    private static WritePolicy wPolicy = null;
    private static QueryPolicy qPolicy = null;
    static
    {
        qPolicy = new QueryPolicy();
        qPolicy.consistencyLevel = ConsistencyLevel.CONSISTENCY_ONE;

        wPolicy = new WritePolicy();
        wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
        wPolicy.commitLevel = CommitLevel.COMMIT_MASTER;

        cPolicy = new ClientPolicy();
        cPolicy.timeout = 500;
        cPolicy.writePolicyDefault = wPolicy;

        bPolicy = new BatchPolicy();
        bPolicy.timeout = 500;

        aeroclient = new AerospikeClient[12];
            for (byte i = 0; i < 12; i++) {
                aeroclient[i] = new AerospikeClient(cPolicy, "192.168.0.5", 3000);
            }
        instance = new DBClient();
    }


    public static DBClient getInstance() {
        return instance;
    }

    public JSONObject get(byte domainID,Key key) throws DBException {
        while(true) {
            try {
                Record r = aeroclient[domainID].get(bPolicy, key);
                if (r == null)
                    return null;
                String o = (String) r.getValue("data");
                if (o == null)
                    return null;
                else
                    return new JSONObject(o);
            }
            catch (Exception e)
            {
                ErrorLogger.logException(e,key.toString());
                throw new DBException();
            }
        }
    }


    public void put(byte domainID,Key akey, JSONObject jo) throws DBException {
        while(true) {
            try {
                String data = jo.toString();
                Bin b = new Bin("data",data);
                aeroclient[domainID].put(wPolicy, akey, b);
                return;
            }
            catch (Exception e)
            {
                ErrorLogger.logException(e);
                throw new DBException();
            }
        }
    }

    public RecordSet query(byte domainID,Statement stmnt)
    {
        return aeroclient[domainID].query(qPolicy, stmnt);
    } };

We running on AS 3.5.3 Enterprise and our Java Library is: 3.0.35 (Maven).

The Library is crashing if we run in a Multithreaded enviroment heres our Stacktrace:

Exception in thread “main” com.aerospike.client.AerospikeException: Error Code 1: Server error at com.aerospike.client.query.QueryRecordCommand.parseRecordResults(QueryRecordCommand.java:69) at com.aerospike.client.command.MultiCommand.parseResult(MultiCommand.java:59) at com.aerospike.client.command.SyncCommand.execute(SyncCommand.java:56) at com.aerospike.client.query.QueryExecutor$QueryThread.run(QueryExecutor.java:137) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

ServerLog ist staying Empty. Could you please help us out?

Thank You!

As a starter, AerospikeClient handles concurrency underneath. It is recommended to simply have 1 AerospikeClient, instead of 12. This will minimize starting up unnecessary resources.

Thank you for this information, but the problem persists

Is there any information in the aerospike.log?

ServerLog ist staying Empty. Could you please help us out?

Hmmm server log shouldn’t be empty. It should at least have some “INFO” level logging, so as below

Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4634)  system memory: free 2758068kb ( 70 percent free ) 
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4642)  migrates in progress ( 0 , 0 ) ::: ClusterSize 1 ::: objects 175 ::: sub_objects 0
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4650)  rec refs 175 ::: rec locks 0 ::: trees 0 ::: wr reqs 0 ::: mig tx 0 ::: mig rx 0
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4656)  replica errs :: null 0 non-null 0 ::: sync copy errs :: node 0 :: master 0 
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4666)    trans_in_progress: wr 0 prox 0 wait 0 ::: q 0 ::: bq 0 ::: iq 0 ::: dq 0 : fds - proto (0, 81, 81) : hb (0, 0, 0) : fab (16, 16, 0)
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4668)    heartbeat_received: self 631866 : foreign 0

Server Error Code 1 (UNKNOWN_ERROR) would typically have either an INFO or WARNING log indicating what the error was.