I need to query up to 2 million records using a set of keys. Memory restrictions prevent the application from reading them all in at once, so I need to write the record data out to files as it comes in rather than receiving the query results into mem all at once.
So I originally tried doing multiple AerospikeClient.get() calls, each with 5000 keys. This works, but is pretty slow compared to the existing app we have in place that calls another DB using the JDBC API that returns a java.sql.ResultSet that can be iterated over, fetching the results as it goes, so not all results are in memory at once.
It seems like Aerospike has the same functionality with AerospikeClient.query(), but I haven’t gotten it to work. It’s hard to find any examples online that fit my use case. My code:
QueryPolicy policy = new QueryPolicy();
policy.totalTimeout = timeoutMs;
policy.socketTimeout = timeoutMs;
policy.recordQueueSize = 5000;
Exp[] eqPredicates = new Exp[keyValues.size()]; // keyValues is Set<String>
int i = 0;
for (String key : keyValues) {
eqPredicates[i++] = Exp.eq(Exp.stringBin("id"), Exp.val(key));
}
policy.filterExp = Exp.build(Exp.or(eqPredicates));
Statement statement = new Statement();
statement.setNamespace("my_db");
statement.setSetName("my_table");
try (RecordSet records = client.query(policy, statement)) {
while (records.next()) {
// process record.
}
}
I was trying to figure out a way to use statement.setFilter()
in place of policy.filterExp = Exp.build(...)
, but I can’t figure out how to create a Filter object from what I’ve got.
I’m getting this exception from the records.next() call:
com.aerospike.client.AerospikeException: Error 4,1,0,120000,120000,5,BBADF67A0D695A <AS_NODE_IP_ADDRESS> 3000: Parameter error
at com.aerospike.client.query.QueryPartitionCommand.parseRow(QueryPartitionCommand.java:92)
at com.aerospike.client.command.MultiCommand.parseGroup(MultiCommand.java:254)
at com.aerospike.client.command.MultiCommand.parseResult(MultiCommand.java:219)
at com.aerospike.client.command.SyncCommand.executeCommand(SyncCommand.java:111)
at com.aerospike.client.query.QueryPartitionCommand.execute(QueryPartitionCommand.java:57)
at com.aerospike.client.query.QueryPartitionExecutor$QueryThread.run(QueryPartitionExecutor.java:235)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Anyone know what’s causing the error? Am I on the right track as far as how I’m going about this, or is there an obvious alternative that I’m missing? Or is doing separate Aerospike.client() calls for 5000 keys at a time the optimal solution, performance-wise?
Thanks! -Brent