Weird behavior of java client on secondary index

index
secondary
java

#1

Hi,

I’m facing this weird java client behavior where it stops fetching results based on the secondary index. I have a secondary index on a field called node_id (string). My java client queries based on the node_id and retrieves the records. The code runs on a jetty server. Here’s the scenario. I deploy the code, start the server, everything works. But after few days of running, the query stops fetching result based on node id. I’m able to query based on node_id using aql. Moreover, the failure is happening for random node ids, rest works without any issue. The only remediation, in this case, is to restart the server, after which it goes away, only to come back after a few days. The same client seems to work great for queries based on primary key, the issue is limited to secondary index only. Here’s the code snippet:

AerospikeClient client = connectionMgr.getConnection(IAeroDaoImplConstant.READ);
Statement stmt = aeroDaoUtil.getStatement();
stmt.setFilters(Filter.equal(config.getPropertyAsString(
			IAeroDaoImplConstant.NODE_ID_COL), req.getNodeId()));
try{
	recordSet = client.query(null, stmt);
	while (recordSet != null && recordSet.next()) {
				      // process record
	}
   }catch (Exception ex) {
	LOGGER.error("Error is getting node id", ex);
	res.setStatus(IAeroDaoImplConstant.ERROR);
   }finally {
	recordSet.close();
   }

Here’s my configuration:

service {
        user root
        group root
        paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically reduced to 1.
        pidfile /var/run/aerospike/asd.pid
        service-threads 4
        transaction-queues 8
        transaction-threads-per-queue 8
        proto-fd-max 15000
}

logging {
        # Log file must be an absolute path.
        file /mnt/ebs2/aerospike/log/aerospike.log {
                context any info
        }
}

network {
        service {
                address any
                port 3000
        }

        heartbeat {
                mode mesh
                port 3002 # Heartbeat port for this node.
                address xx.xx.x.xx6

                # List one or more other nodes, one ip-address & port per line:
                mesh-seed-address-port xx.xx.xx.xx9 3002

                interval 150
                timeout 10
        }

        fabric {
                port 3001
        }

        info {
                port 3003
        }
}

namespace caas {
        replication-factor 2
        memory-size 10G
        default-ttl 0 # 30 days, use 0 to never expire/evict.

        #storage-engine memory

        # To use file storage backing, comment out the line above and use the
        # following lines instead.
        storage-engine device {
                file /mnt/ebs2/aerospike/data/caas.dat
                filesize 200G
                write-block-size 1M
#               data-in-memory true # Store data in memory in addition to file.
#       }
}

I’ve 2 aerospike nodes, running on version 3.6.2-el6.

I’m totally confused on what can go wrong here, any pointers will be appreciated.

Thanks, Shamik


#2

I see two problems.

  1. recordSet.close() is called if “client.query()” fails. This will not work.

  2. The Statement instance should not be shared. Instead, create a new Statement instance for every query.


#3

Thanks for your reply.

The statement instance is not getting used, a rather new object gets instantiated for each request. I should have mentioned that in the code.

I’ll take a look into the record set part.


#4

You should provide the source code for “aeroDaoUtil.getStatement()”.


#5

Here it is

public Statement getStatement(){
     Statement stmt = new Statement();
     stmt.setNamespace(config.getPropertyAsString(IAeroDaoImplConstant.AEROSPIKE_CAAS_NAMESPACE));
     stmt.setSetName(config.getPropertyAsString(IAeroDaoImplConstant.AEROSPIKE_CAAS_SET));
     return stmt;
}

#6

The Statement is initialized correctly. Can you provide source code for “connectionMgr.getConnection()” too?


#7

Here you go

private AerospikeClient readClient;

public AerospikeClient getConnection(String connType){
	AerospikeClient client = null;
	switch(connType){
	case IAeroDaoImplConstant.READ:
		client = createReadConnection();
		break;
	case IAeroDaoImplConstant.WRITE:
		client = createWriteConnection();
		break;
	case IAeroDaoImplConstant.ASYNC:
		client = createAsyncConnection();
		break;
       }
	return client;
}


private AerospikeClient createReadConnection(){
	if(readClient == null){
	        // initialize client
		String[] hosts = config.getPropertyAsString(
				IAeroDaoImplConstant.AEROSPIKE_HOSTS).split(",");
		List hostList = new ArrayList();
		for(String each : hosts){
			Host host = new Host(each, 
			       config.getPropertyAsIneger(IAeroDaoImplConstant.AEROSPIKE_PORT));
			hostList.add(host);
		}
		readClient = new AerospikeClient(
			getReadPolicy(), hostList.toArray(new Host[hostList.size()]));
	}
	return readClient;
}


public ClientPolicy getReadPolicy(){
	if(readPolicy == null){
		readPolicy = new ClientPolicy();
		readPolicy.readPolicyDefault.timeout = 
			config.getPropertyAsIneger(IAeroDaoImplConstant.READ_POLICY_DEFAULT_TIMEOUT);   
		readPolicy.readPolicyDefault.maxRetries = 
			config.getPropertyAsIneger(IAeroDaoImplConstant.READ_POLICY_DEFAULT_MAXTRY);
		readPolicy.readPolicyDefault.sleepBetweenRetries = 
	               config.getPropertyAsIneger(IAeroDaoImplConstant.READ_POLICY_DEFAULT_SLEEP_BETWEEN_RETRY);
		readPolicy.writePolicyDefault.timeout = 
			config.getPropertyAsIneger(IAeroDaoImplConstant.WRITE_POLICY_DEFAULT_TIMEOUT);  
		readPolicy.writePolicyDefault.maxRetries = 
			config.getPropertyAsIneger(IAeroDaoImplConstant.WRITE_POLICY_DEFAULT_MAXTRY);
		readPolicy.writePolicyDefault.sleepBetweenRetries = 
			config.getPropertyAsIneger(IAeroDaoImplConstant.WRITE_POLICY_DEFAULT_SLEEP_BETWEEN_RETRY);
	}		
	return readPolicy;
}

#8

Three client instances are being created when one will do. Each client spawns a cluster tend thread which periodically polls the server nodes for cluster status. It’s extra overhead to create more than one client per cluster.

Since you are using both async and sync commands, I recommend creating a single AsyncClient instance. AsyncClient can perform both async and sync commands because it inherits from AerospikeClient.

In any case, the code you provided should work (if client.query() is called before the try/finally block). Are you getting an exception (result code?) or just no returned records?


#9

I created a separate connection for read, write and async as I was struggling to apply the appropriate policy for each type. Are you saying I can create one instance of AsyncClient and can re-use that for both read and writes ?

Meanwhile,I’ve updated the code to move the query execution before the try block and handle resultset as per your suggestion. I’ll keep an eye on the issue and see if it resurfaces.

Appreciate your help.


#10

Yes, the default policies are just defaults. You can specify your policy directly on each client command instead of passing null.

Also, read and write policies are different than the query policy.