Querying for records with a large number of keys

I need to query up to 2 million records using a set of keys. Memory restrictions prevent the application from reading them all in at once, so I need to write the record data out to files as it comes in rather than receiving the query results into mem all at once.

So I originally tried doing multiple AerospikeClient.get() calls, each with 5000 keys. This works, but is pretty slow compared to the existing app we have in place that calls another DB using the JDBC API that returns a java.sql.ResultSet that can be iterated over, fetching the results as it goes, so not all results are in memory at once.

It seems like Aerospike has the same functionality with AerospikeClient.query(), but I haven’t gotten it to work. It’s hard to find any examples online that fit my use case. My code:

QueryPolicy policy = new QueryPolicy();
policy.totalTimeout = timeoutMs;
policy.socketTimeout = timeoutMs;
policy.recordQueueSize = 5000;
Exp[] eqPredicates = new Exp[keyValues.size()]; // keyValues is Set<String>
int i = 0;
for (String key : keyValues) {
    eqPredicates[i++] = Exp.eq(Exp.stringBin("id"), Exp.val(key));
}
policy.filterExp = Exp.build(Exp.or(eqPredicates));
Statement statement = new Statement();
statement.setNamespace("my_db");
statement.setSetName("my_table");
try (RecordSet records = client.query(policy, statement)) {
    while (records.next()) {
        // process record.
    }
}

I was trying to figure out a way to use statement.setFilter() in place of policy.filterExp = Exp.build(...), but I can’t figure out how to create a Filter object from what I’ve got.

I’m getting this exception from the records.next() call:

com.aerospike.client.AerospikeException: Error 4,1,0,120000,120000,5,BBADF67A0D695A <AS_NODE_IP_ADDRESS> 3000: Parameter error
        at com.aerospike.client.query.QueryPartitionCommand.parseRow(QueryPartitionCommand.java:92)
        at com.aerospike.client.command.MultiCommand.parseGroup(MultiCommand.java:254)
        at com.aerospike.client.command.MultiCommand.parseResult(MultiCommand.java:219)
        at com.aerospike.client.command.SyncCommand.executeCommand(SyncCommand.java:111)
        at com.aerospike.client.query.QueryPartitionCommand.execute(QueryPartitionCommand.java:57)
        at com.aerospike.client.query.QueryPartitionExecutor$QueryThread.run(QueryPartitionExecutor.java:235)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Anyone know what’s causing the error? Am I on the right track as far as how I’m going about this, or is there an obvious alternative that I’m missing? Or is doing separate Aerospike.client() calls for 5000 keys at a time the optimal solution, performance-wise?

Thanks! -Brent

1- statement.setFilter would be if you wanted to records where stringBin id contained a specific string - its a string equality filter that you will use by first creating a string data type secondary index on the id bin. That’s not what your intent is. (In the example below, it would be a query such as: Find all records where name bin has "Jill" )

2 - I replicated your code - perhaps your error may be in the last part where you process the record set.

My Data:

key0 : (gen:1),(exp:0),(bins:(name:Sandra),(age:34))
key1 : (gen:1),(exp:0),(bins:(name:Jack),(age:26))
key2 : (gen:1),(exp:0),(bins:(name:Jill),(age:20))
key3 : (gen:1),(exp:0),(bins:(name:James),(age:38))
key4 : (gen:1),(exp:0),(bins:(name:Jim),(age:46))
key5 : (gen:1),(exp:0),(bins:(name:Julia),(age:62))
key6 : (gen:1),(exp:0),(bins:(name:Sally),(age:32))
key7 : (gen:1),(exp:0),(bins:(name:Sean),(age:24))
key8 : (gen:1),(exp:0),(bins:(name:Sam),(age:12))
key9 : (gen:1),(exp:0),(bins:(name:Susan),(age:42))

Test code - replicated close to your code:

//Needed imports
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Operation;
import com.aerospike.client.Bin;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.RecordSet;

// Expression Filter imports
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;

//ArrayList
import java.util.List;
import java.util.ArrayList;
//import java.util.Arrays;

//Run SI query
Statement stmt = new Statement();
stmt.setNamespace("test");
stmt.setSetName("testset");
//stmt.setFilter(Filter.range("age", 20,30));

List<String> keyValues = new ArrayList<String>();
keyValues.add("Jack");
keyValues.add("Jill");
keyValues.add("Julia");

Exp[] eqPredicates = new Exp[keyValues.size()]; 
int i = 0;
for (String key : keyValues) {
    eqPredicates[i++] = Exp.eq(Exp.stringBin("name"), Exp.val(key));
}
//Expression nameExp = Exp.build( Exp.or(eqPredicates) );

QueryPolicy qp = new QueryPolicy();
int timeoutMs = 120000;
qp.totalTimeout = timeoutMs;
qp.socketTimeout = timeoutMs;
qp.recordQueueSize = 5000;
//qp.filterExp = nameExp;
qp.filterExp = Exp.build( Exp.or(eqPredicates) );

try( RecordSet rs = client.query(qp, stmt)){
  while(rs.next()){
    Record r = rs.getRecord();
    //Key thisKey = rs.getKey();  
    System.out.println(r);
  }
}

and my output:

(gen:1),(exp:0),(bins:(name:Julia),(age:62))
(gen:1),(exp:0),(bins:(name:Jill),(age:20))
(gen:1),(exp:0),(bins:(name:Jack),(age:26))

Whether this technique would work when keyValues size = 2 million ??? - TBD. First lets make sure the code works for a small keyValues size.

I completely removed all logic from the //process record. block, and just put in a print statement. I still get the same exact exception.

So I tried removing the policy filter expression altogether, and just adding

statement.setMaxRecords(5);

This gets me a successful query, with this output:

RECORD: (gen:4),(exp:460430457),(bins:(id:fe045d879a524427b0640a869ce9e7b7),(event:[B@421e912d))
RECORD: (gen:2),(exp:461501613),(bins:(id:194f54ffa5864746a7105add3986bfbf),(event:[B@2920485))
RECORD: (gen:5),(exp:459030277),(bins:(id:f9f2c4c8033d4555b4050542c9129b01),(event:[B@e8c53b7))
RECORD: (gen:1),(exp:457554535),(bins:(id:90bd937932be4dc5ba9ec429a1e443c4),(event:[B@3fc12f5e))
RECORD: (gen:2),(exp:459673522),(bins:(id:b3dfe0665bff4ac6bf47d27f9c9a8b06),(event:[B@9604672))

So something’s wrong with the filter, but I can’t figure out what. I tried switching the key from String to bytes, just changing it to this:

eqPredicates[i++] = Exp.eq(Exp.stringBin("id"), Exp.val(key.getBytes()));

but no luck (same error).

  1. What is your Java client version and server version?
  2. Exp.eq - stringBin("id") - then bin id must contain string data to match.
  3. statement.setMaxRecords is just flow control - if you want to control how query progresses (“pagination”) from the client instead of letting the Aerospike Client Library do its default thing. So what the client library is essentially doing is telling server to scan all records but stop sending after 5 records.
  4. It does show your id bin has this value: fe045d879a524427b0640a869ce9e7b7 Is this stored as a string in the bin? If these are bytes in a byte array then you need to do Exp.blobBin("id") instead of Exp.stringBin("id").

i.e. try:

eqPredicates[i++] = Exp.eq(Exp.blobBin("id"), Exp.val(key.getBytes()));
  1. Java client is 7.2.1. The server version is 6.4.0.10.
  2. I’m fairly sure it contains a string. Still working on getting access to aql to view the schema. Here is the code that writes to it:
ByteBuffer eventByteBuffer = toByteBuffer(<jsonObject>);
String id = "fe045d879a524427b0640a869ce9e7b7";
Key key = new Key("my_db", "my_table", id);
Bin[] bins = new Bin[2];
bins[0] = new Bin("id", id);
bins[1] = new Bin("event", eventByteBuffer);
client.put(null, key, bins);

I tried your suggestion of

eqPredicates[i++] = Exp.eq(Exp.blobBin("id"), Exp.val(key.getBytes()));

but still get the error.

I switched to Java client 7.2.1 and tried the code below: (My server is 7.0.0.8) 1 - added two new bins to my records above: id and event

AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
for(int i=0; i<10; i++){
    Key key = new Key("test", "testset", "key"+i);
    String id = "fe045d879a524427b0640a869ce9e7b"+i;
    Bin[] bins = new Bin[2];
    bins[0] = new Bin("id", id);
    bins[1] = new Bin("event", "xxx");
    client.put(null, key, bins);
    System.out.println("key"+i+ " : "+client.get(null, key));
}

My records are:

Initialized the client and connected to the cluster.
key0 : (gen:2),(exp:0),(bins:(name:Sandra),(age:34),(id:fe045d879a524427b0640a869ce9e7b0),(event:xxx))
key1 : (gen:2),(exp:0),(bins:(name:Jack),(age:26),(id:fe045d879a524427b0640a869ce9e7b1),(event:xxx))
key2 : (gen:2),(exp:0),(bins:(name:Jill),(age:20),(id:fe045d879a524427b0640a869ce9e7b2),(event:xxx))
key3 : (gen:2),(exp:0),(bins:(name:James),(age:38),(id:fe045d879a524427b0640a869ce9e7b3),(event:xxx))
key4 : (gen:2),(exp:0),(bins:(name:Jim),(age:46),(id:fe045d879a524427b0640a869ce9e7b4),(event:xxx))
key5 : (gen:2),(exp:0),(bins:(name:Julia),(age:62),(id:fe045d879a524427b0640a869ce9e7b5),(event:xxx))
key6 : (gen:2),(exp:0),(bins:(name:Sally),(age:32),(id:fe045d879a524427b0640a869ce9e7b6),(event:xxx))
key7 : (gen:2),(exp:0),(bins:(name:Sean),(age:24),(id:fe045d879a524427b0640a869ce9e7b7),(event:xxx))
key8 : (gen:2),(exp:0),(bins:(name:Sam),(age:12),(id:fe045d879a524427b0640a869ce9e7b8),(event:xxx))
key9 : (gen:2),(exp:0),(bins:(name:Susan),(age:42),(id:fe045d879a524427b0640a869ce9e7b9),(event:xxx))

Modified filter expression to use id bin, code is:

//Run SI query
Statement stmt = new Statement();
stmt.setNamespace("test");
stmt.setSetName("testset");
//stmt.setFilter(Filter.range("age", 20,30));

List<String> keyValues = new ArrayList<String>();
keyValues.add("fe045d879a524427b0640a869ce9e7b0");
keyValues.add("fe045d879a524427b0640a869ce9e7b1");
keyValues.add("fe045d879a524427b0640a869ce9e7b9");

Exp[] eqPredicates = new Exp[keyValues.size()]; 
int i = 0;
for (String key : keyValues) {
    eqPredicates[i++] = Exp.eq(Exp.stringBin("id"), Exp.val(key));
}
//Expression nameExp = Exp.build( Exp.or(eqPredicates) );

QueryPolicy qp = new QueryPolicy();
int timeoutMs = 120000;
qp.totalTimeout = timeoutMs;
qp.socketTimeout = timeoutMs;
qp.recordQueueSize = 5000;
//qp.filterExp = nameExp;
qp.filterExp = Exp.build( Exp.or(eqPredicates) );

try( RecordSet rs = client.query(qp, stmt)){
  while(rs.next()){
    Record r = rs.getRecord();
    //Key thisKey = rs.getKey();  
    System.out.println(r);
  }
}

Output is:

(gen:2),(exp:0),(bins:(name:Susan),(age:42),(id:fe045d879a524427b0640a869ce9e7b9),(event:xxx))
(gen:2),(exp:0),(bins:(name:Jack),(age:26),(id:fe045d879a524427b0640a869ce9e7b1),(event:xxx))
(gen:2),(exp:0),(bins:(name:Sandra),(age:34),(id:fe045d879a524427b0640a869ce9e7b0),(event:xxx))

BTW, if id is also your record key, you could use batch reads. But this should work.

Thanks for your help. I’m not sure what could be the issue with my env, but it seems like the code shouldn’t be the issue since it works for you. I’ll keep trying to figure it out on my end.

id is the record key. Would batch reads be faster?

Is doing batch reads different than what I was doing before, which was essentially this:

List<String> allIds = getAllIds(); // Up to 2 million.

BatchPolicy policy = BatchPolicy.ReadDefault();
policy.totalTimeout = 120000;
policy.socketTimeout = 120000;
policy.readModeAP = ReadModeAP.ALL;
policy.replica = Replica.MASTER_PROLES;

int index = 0;
while (index < allIds.size()) {
    // Get next page of IDs (max 5000).
    int nextIndex = Math.min(index + 5000, allIds.size());
    List<String> pageOfIds = allIds.subList(index, nextIndex);
    index = nextIndex;
    
    Key[] pageOfKeys = pageOfIds.stream().map(id -> new Key("my_db", "my_table", id)).toArray(Key[]::new);
    Record[] pageOfRecords = aerospikeClient.get(policy, pageOfKeys);
    
    for (Record record : pageOfRecords) {
        // Process record.
    }
}

This works, but as I said above, it’s a lot slower than when we do a single query to the DB we’re trying to supplement with AS. I was thinking the single AerospikeClient.query() would be faster because it’s also a single query rather than up to 400 like this code can do when there is 2 million IDs.

Yes, quick glance at your API, you are doing batch reads, and this is the fastest way to do it since you are spreading get across all nodes using the client library instead of multithreading with individual gets in the application. In the query technique, you are essentially reading every single record in the namespace and checking if it qualifies. With batch get, you are directly accessing the record of interest. Querying is not the better approach here, plus for whatever reason its not working in your env. ( I am also stumped why :slight_smile: ) This policy

policy.readModeAP = ReadModeAP.ALL;

– implies consult the replica on reads if different versions of the same partition exist on the cluster. This may happen when cluster state is changing (node removed and added back in - rolling upgrade scenario) and migrations are going on. In a stable cluster, it should not cost you any extra latency. (Default is ONE)

Next, this one:

policy.replica = Replica.MASTER_PROLES;

is this what you want to do? Default is SEQUENCE which means reads go to master partition first. I would suggest stay with SEQUENCE. Only go to replica if reads from master fails for some reason.

However, you are waiting for all results to come back to client and read them in the order of the keys. If you want to consume results as they come and don’t care about the order, you can use:

Async Batch Read - with same read policy for all reads, i.e. implement this API:

get(EventLoop eventLoop, RecordSequenceListener listener, BatchPolicy policy, Key[] keys)

This will allow you to consume results as they come.