Multithreading Statements


#1

Hello,

We got trouble running Statements in java. Currently were testing Aero with 1 namespace. We inserted a lot of JSONs (AeroSpikeDB.priceDataSets[domainID]) to several sets (only one Bin (“data”)). Now we would like to run a MapReduce job over all JSONs in the Database to producte an average of one of our datasets. Heres our Testing class (DBClient is just a Wrapper about AerospikeClient, hes creating a Aerospike client for each domainID).

public class Testing {
    private static final DBClient client = DBClient.getInstance();
    class AVG
    {
        long count;
        long avg;
    };
    private static final Map<Byte,AVG> queryCounts = Collections.synchronizedMap(new HashMap<>(CONFIG.domainsToCrawlArraySize));

    void generateAVG(byte domainID)
    {
        Statement stmnt = new Statement();
        stmnt.setNamespace("testing");
        stmnt.setSetName(AeroSpikeDB.priceDataSets[domainID]);
        stmnt.setBinNames("data");

        AVG avg = new AVG();
        try (RecordSet rs = client.query(domainID,stmnt)){
            avg.count=0;
            BigDecimal bd = new BigDecimal(0);
            while (rs.next()) {
                Key key = rs.getKey();
                Record record = rs.getRecord();
                String jsonString = (String)record.bins.get("data");
                JSONObject jo = new JSONObject(jsonString);
                if(jo.has("lastupdate")) {
                    long lastupdate = jo.getLong("lastupdate");
                    bd = bd.add(new BigDecimal(lastupdate));
                    avg.count++;
                    if(avg.count%1000 == 0)
                    {
                        System.out.println(domainID + " -> " + avg.count);
                    }
                }
                else
                {
                    System.out.println("Failed!");
                }
            }
            bd = bd.divide(new BigDecimal(avg.count), 0, RoundingMode.HALF_UP);
            avg.avg = Long.valueOf(bd.toString());
            queryCounts.put(domainID,avg);
        } catch (JSONException e) {
            ErrorLogger.logException(e);
        }

    }
    public static void main(String args[])
    {
        Testing t = new Testing();
        Stream.of((byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, (byte) 6, (byte) 7, (byte) 8, (byte) 9, (byte) 12).parallel().forEach(domainID -> {
            t.generateAVG(domainID);
        }
        );

        System.out.println(queryCounts);
    }

Heres the DBClient Wrapper Class:

public  class DBClient {
    private static DBClient instance = null;
    private static AerospikeClient aeroclient[] = null;
    private static ClientPolicy cPolicy = null;
    private static BatchPolicy bPolicy = null;
    private static WritePolicy wPolicy = null;
    private static QueryPolicy qPolicy = null;
    static
    {
        qPolicy = new QueryPolicy();
        qPolicy.consistencyLevel = ConsistencyLevel.CONSISTENCY_ONE;

        wPolicy = new WritePolicy();
        wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
        wPolicy.commitLevel = CommitLevel.COMMIT_MASTER;

        cPolicy = new ClientPolicy();
        cPolicy.timeout = 500;
        cPolicy.writePolicyDefault = wPolicy;

        bPolicy = new BatchPolicy();
        bPolicy.timeout = 500;

        aeroclient = new AerospikeClient[12];
            for (byte i = 0; i < 12; i++) {
                aeroclient[i] = new AerospikeClient(cPolicy, "192.168.0.5", 3000);
            }
        instance = new DBClient();
    }


    public static DBClient getInstance() {
        return instance;
    }

    public JSONObject get(byte domainID,Key key) throws DBException {
        while(true) {
            try {
                Record r = aeroclient[domainID].get(bPolicy, key);
                if (r == null)
                    return null;
                String o = (String) r.getValue("data");
                if (o == null)
                    return null;
                else
                    return new JSONObject(o);
            }
            catch (Exception e)
            {
                ErrorLogger.logException(e,key.toString());
                throw new DBException();
            }
        }
    }


    public void put(byte domainID,Key akey, JSONObject jo) throws DBException {
        while(true) {
            try {
                String data = jo.toString();
                Bin b = new Bin("data",data);
                aeroclient[domainID].put(wPolicy, akey, b);
                return;
            }
            catch (Exception e)
            {
                ErrorLogger.logException(e);
                throw new DBException();
            }
        }
    }

    public RecordSet query(byte domainID,Statement stmnt)
    {
        return aeroclient[domainID].query(qPolicy, stmnt);
    } };

We running on AS 3.5.3 Enterprise and our Java Library is: 3.0.35 (Maven).

The Library is crashing if we run in a Multithreaded enviroment heres our Stacktrace:

Exception in thread “main” com.aerospike.client.AerospikeException: Error Code 1: Server error at com.aerospike.client.query.QueryRecordCommand.parseRecordResults(QueryRecordCommand.java:69) at com.aerospike.client.command.MultiCommand.parseResult(MultiCommand.java:59) at com.aerospike.client.command.SyncCommand.execute(SyncCommand.java:56) at com.aerospike.client.query.QueryExecutor$QueryThread.run(QueryExecutor.java:137) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

ServerLog ist staying Empty. Could you please help us out?

Thank You!


#2

As a starter, AerospikeClient handles concurrency underneath. It is recommended to simply have 1 AerospikeClient, instead of 12. This will minimize starting up unnecessary resources.


#3

Thank you for this information, but the problem persists


#4

Is there any information in the aerospike.log?


#5

ServerLog ist staying Empty. Could you please help us out?


#6

Hmmm server log shouldn’t be empty. It should at least have some “INFO” level logging, so as below

Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4634)  system memory: free 2758068kb ( 70 percent free ) 
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4642)  migrates in progress ( 0 , 0 ) ::: ClusterSize 1 ::: objects 175 ::: sub_objects 0
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4650)  rec refs 175 ::: rec locks 0 ::: trees 0 ::: wr reqs 0 ::: mig tx 0 ::: mig rx 0
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4656)  replica errs :: null 0 non-null 0 ::: sync copy errs :: node 0 :: master 0 
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4666)    trans_in_progress: wr 0 prox 0 wait 0 ::: q 0 ::: bq 0 ::: iq 0 ::: dq 0 : fds - proto (0, 81, 81) : hb (0, 0, 0) : fab (16, 16, 0)
Mar 17 2015 05:51:37 GMT: INFO (info): (thr_info.c::4668)    heartbeat_received: self 631866 : foreign 0

Server Error Code 1 (UNKNOWN_ERROR) would typically have either an INFO or WARNING log indicating what the error was.