I have an application that’s performing a pile of upserts. So many that an asynchronous approach is necessary to keep on a reasonable business schedule. The easy way to do this was to take advantage of the job handling built into the AsyncClient.
Upsert starts with AsyncClient.read using a ReadListener that potentially modifies the values in a key’s bin and then calls AsyncClient.put if something has changed. The WriteListener just provides some information and exits (mostly keeping a count of running threads for the parent thread). Both Listeners provide retry callbacks from OnFailure, but only a limited number.
What I’ve found is that when AsyncClient.put is called, OnSuccess is fired but nothing is written. Even worse, setting the RecordExistsAction to REPLACE successfully deletes the bin from the key.
Using the synchronous version of AsyncClient.put works (meaning records are written).
What am I doing wrong? Is this a bug?
Thanks,
Tom Shulruff
Senior Software Engineer
4INFO, Inc.
I really can’t provide the exact source code, so here’s some pseudo-code:
Start with the com.aerospike.examples.AsyncPutGet example and reverse the calls to make it AsyncGetPut. If get pulls a null, make something for put. Otherwise adjust what you got and call put. Then query and discover that nothing has changed.
Our system stores arrays and appends to the arrays.
private class ReadHandler implements RecordListener {
public void onSuccess(Key key, Record record) {
// ... Do some stuff with data that showed up (maybe add new data) ...
client.put(null, new WriteHandler(), key, bin);
// client.put(null, key, bin); // ... Works, but run will stop randomly...
}
public void onFailure() {
// ... Same retry loop as the example, only log a complete failure. ...
}
}
private class WriteHandler implements WriteListener {
public void onSuccess(Key key) {
// ... All done! Expect new data in the record...
}
public void onFailure() {
// ... Same retry loop as the example, only log a complete failure ...
}
}
public void doUpserts() {
// ... Do 10M to get a feel for the real load...
for ( long i = 1; i <= 10000000; ++i ) {
Key newKey = (namespace, set, i);
client.get(null, new ReadListener(), key);
}
}
You are performing nested async commands which usually require offloading async callbacks to a separate thread. This is done to prevent deadlock situations. The ClientPolicy configuration should be something like this:
AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
clientPolicy.asyncTaskThreadPool =
Executors.newCachedThreadPool(new ThreadFactory() {
public final Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
}
});
AsyncClient client = new AsyncClient(clientPolicy, "host", 3000);
Here’s the initialization code. asyncMaxCommands is set to the number of cores times a multiplier pulled from configuration (usually 5). Been testing on a two core system, so should have been set to 10.
this.threadPool = Executors.newCachedThreadPool(new ThreadFactory() {
public final Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
}
});
this.clientPolicy = new AsyncClientPolicy();
this.clientPolicy.maxThreads = Runtime.getRuntime().availableProcessors() * this.coreMultiplier.intValue();
this.clientPolicy.asyncTaskThreadPool = this.threadPool;
this.clientPolicy.asyncMaxCommands = this.clientPolicy.maxThreads;
this.clientPolicy.asyncMaxCommandAction = MaxCommandAction.BLOCK;
maxThreads is used to size the synchronous connection pools. maxThreads has nothing to do with asynchronous commands. Instead, use asyncSelectorThreads.