Async Upsert Issue (silent but deadly failures)

Hello,

I have an application that’s performing a pile of upserts. So many that an asynchronous approach is necessary to keep on a reasonable business schedule. The easy way to do this was to take advantage of the job handling built into the AsyncClient.

Upsert starts with AsyncClient.read using a ReadListener that potentially modifies the values in a key’s bin and then calls AsyncClient.put if something has changed. The WriteListener just provides some information and exits (mostly keeping a count of running threads for the parent thread). Both Listeners provide retry callbacks from OnFailure, but only a limited number.

What I’ve found is that when AsyncClient.put is called, OnSuccess is fired but nothing is written. Even worse, setting the RecordExistsAction to REPLACE successfully deletes the bin from the key.

Using the synchronous version of AsyncClient.put works (meaning records are written).

What am I doing wrong? Is this a bug?

Thanks,
Tom Shulruff
Senior Software Engineer
4INFO, Inc.

Can you provide source code for your async read and writes?

I really can’t provide the exact source code, so here’s some pseudo-code:

Start with the com.aerospike.examples.AsyncPutGet example and reverse the calls to make it AsyncGetPut. If get pulls a null, make something for put. Otherwise adjust what you got and call put. Then query and discover that nothing has changed.

Our system stores arrays and appends to the arrays.

private class ReadHandler implements RecordListener {
    public void onSuccess(Key key, Record record) {
        // ... Do some stuff with data that showed up (maybe add new data) ...
        client.put(null, new WriteHandler(), key, bin);

        // client.put(null, key, bin); // ... Works, but run will stop randomly...
    }

    public void onFailure() {
        // ... Same retry loop as the example, only log a complete failure. ...
    }
}

private class WriteHandler implements WriteListener {
    public void onSuccess(Key key) {
        // ... All done! Expect new data in the record...
    }

    public void onFailure() {
        // ... Same retry loop as the example, only log a complete failure ...
    }
}

public void doUpserts() {

    // ... Do 10M to get a feel for the real load...
    for ( long i = 1; i <= 10000000; ++i ) {
        Key newKey = (namespace, set, i);
        client.get(null, new ReadListener(), key);
    }
}

The pseudo code looks reasonable. I recommend checking your bin value for null. Null bin values will cause the bin to be deleted.

Before the put, I make a new Bin(stuff) to put. It really, really shouldn’t be null.

The synchronous put works fine (because the Bin isn’t null).

Deletion was happening when I set the RecordExistsAction to REPLACE. As though the async half never got the bin from the call.

You are performing nested async commands which usually require offloading async callbacks to a separate thread. This is done to prevent deadlock situations. The ClientPolicy configuration should be something like this:

AsyncClientPolicy clientPolicy = new AsyncClientPolicy();

clientPolicy.asyncTaskThreadPool =  
	Executors.newCachedThreadPool(new ThreadFactory() {
		public final Thread newThread(Runnable runnable) {
			Thread thread = new Thread(runnable);
			thread.setDaemon(true);
			return thread;
	 	}
	});

AsyncClient client = new AsyncClient(clientPolicy, "host", 3000);

That’s exactly what it looks like.

When using Executors.newCachedThreadPool, it generates a bajillion threads (not enough vertical space in “top” to see them all).

When using Executors.newFixedThreadPool, it only generates the configured number of threads.

Both exhibit the same behavior.

What is your AsyncClientPolicy.asyncMaxCommands setting?

It shouldn’t be greater than 200. The number of task threads is limited by asyncMaxCommands.

Here’s the initialization code. asyncMaxCommands is set to the number of cores times a multiplier pulled from configuration (usually 5). Been testing on a two core system, so should have been set to 10.

                   this.threadPool = Executors.newCachedThreadPool(new ThreadFactory() {
                            public final Thread newThread(Runnable runnable) {
                                    Thread thread = new Thread(runnable);
                                    thread.setDaemon(true);
                                    return thread;
                            }
                    });

                    this.clientPolicy = new AsyncClientPolicy();

                    this.clientPolicy.maxThreads = Runtime.getRuntime().availableProcessors() * this.coreMultiplier.intValue();

                    this.clientPolicy.asyncTaskThreadPool = this.threadPool;
                    this.clientPolicy.asyncMaxCommands = this.clientPolicy.maxThreads;
                    this.clientPolicy.asyncMaxCommandAction = MaxCommandAction.BLOCK;

maxThreads is used to size the synchronous connection pools. maxThreads has nothing to do with asynchronous commands. Instead, use asyncSelectorThreads.

this.clientPolicy.asyncSelectorThreads = Runtime.getRuntime().availableProcessors() * this.coreMultiplier.intValue();
this.clientPolicy.asyncMaxCommands = 100;

Thanks, Brian. I’ll give that a try.

Any thoughts about the silent write failures from the nested put?

Some possible ideas on troubleshooting -

  • Isolate the record and update command in question, with equivalent policy flag, and see the behavior in sync mode.
  • check the generation of the record post the write to validate that no other updates have happened.
  • check server side statistics to see if any failure has happened.

Good luck!