Mongo upsert-like capability?

mongodb
counter

#1

I’m curious what the best way to do something functionality similar to what MongoDB’s upsert gives you.

We need to take in values (integers) at a very high rate and slot them into the appropriate key value pairs. This means setting them to the first number if they don’t exist already, or adding them to the existing number if they already exist.

MongoDB’s upsert function does exactly this and I’m curious how we can do this in Aerospike. I tried doing an increment on a value that didn’t exist yet but it errored out saying the key didn’t exist.

I see that there’s a concept of chaining operators together… would it be possible to do this with something like:

if foo doesn’t exist: set foo to 2 if foo already exists: add 2 to current value of foo

Thanks in advance


#2

hi Joshua,

Yes, you can do as you said. Better to do would be to try an increment first, and if it fails with Error 2 AEROSPIKE_ERR_RECORD_NOT_FOUND, then do insert.

In PHP, it would be something like:

$i_status = $asd->increment($key, 'bin1', 1);
if ($i_status == Aerospike::OK) {
    echo "increment done.\n";
} elseif( $i_status == 2) {
     echo "increment failed ".$asd->error()."\n";
     echo "adding record\n";
     $w_status = $asd->put($key, $bins,$ttl);
     if ($w_status == Aerospike::OK) {
    	echo "record added\n";
    }else{
    	echo "[{$asd->errorno()}] ".$asd->error()."\n";
    }
} else {
	    echo "[{$asd->errorno()}] ".$asd->error()."\n";
}

#3

Thanks, Anshu, but this would appear to break the atomicity guarantee that operator chaining or mongo upserts gives you - that is, if you have multiple clients running similar code at the same time, there’s a chance the other client will set a new number at the same time this one is setting it for the first time, effectively wiping out the other value.

Is there a way to do this safely in a simple atomic transaction?


#4

You would use a key generating function that ensures each web node produces a unique ID. The topic was previously discussed in this thread.

In the context of Anshu’s example:

function gen_id() {
  return uniqid(getmypid().getmyinode(), true);
}

status = $asd->increment($key, 'bin1', 1);
if ($status !== Aerospike::OK) {
     $status = $asd->put(gen_id(), $bins,$ttl);
}

Aerospike does not provide multi-key transactions, but it support multi-ops for a single row (the operate() method). That logic is implemented on the application side, as I’ve shown above. With the server speed and reliability being what it is, you can afford the logic for handing the failed state and still get very high performance. I hope that helps.


#5

I must be missing something, because what you’re suggesting doesn’t seem to be addressing what I’m asking… it just creates yet another bucket, right?

Although I’m not totally familiar with the php functions - I’m trying to do this all in python.

I just want to either set key “foo”'s ‘value’ to 3 if ‘value’ or the key “foo” doesn’t exist yet, or add three to it if it’s already there.


#6

Let’s talk Python then.

If key ‘foo’ exists you want to either initialize ‘counter’ to 3, or increment its value by 3:

client = aerospike.client(config).connect()
key = ('test', 'demo', 'foo')
client.increment ( key, 'counter', 3, 3)

If the key ‘foo’ does not exist you want to create the record with a bin called ‘counter’ which is set to 3:

client = aerospike.client(config).connect()
key = ('test', 'demo', 'foo')
try:
  client.increment(key, 'counter', 3, 3)
except:
  try:
    client.put(key, {'counter': 3}, policy={'exists': aerospike.POLICY_EXISTS_CREATE})
  except:
    client.increment(key, 'counter', 3, 3)

Within the initial exception for the increment() operation we handle two cases - the most likely is that we put a new record, with a bin ‘counter’ and set it to 3. If somehow another process is trying to do that at the exact same time, we fallback to simply incrementing the counter that the other process raced ahead and created.

This is more efficient from a computation point, but a bit ugly. Instead, you can check ahead using the exists() method, which is fast operation (faster than a get() ), and create the record if it does not exist, at which point you can increment its value:

client = aerospike.client(config).connect()
key = ('test', 'demo', 'goo')
(key, metadata) = client.exists(key)
if metadata == None:
  try:
    client.put(key, {'counter': 0}, policy={'exists': aerospike.POLICY_EXISTS_CREATE})
  except Exception as e:
    noop # someone already created this record

client.increment(key, 'counter', 3)

#7

Thanks, with the exceptions it becomes clear how this should in theory reduce the chance of problematic multiple-client operations on the same key.

BUT… we were really hoping to see an atomic operator from the db itself since we are handling thousands of transactions per second, so even ‘next line of code check-then-update’ isn’t fast enough. I was hoping the operator syntax or UDF would allow us to have the database decide whether to make the key or increment it.


#8

Hey Joshua.

You can definitely write a UDF that wraps around it and just call it, but as UDFs have the performance penalty of invoking the Lua interpreter, it’ll probably be faster to do the logic in the app, especially since there are going to be more increments than record creations happening. The top code sample is more efficient and follows the Python maxim that it’s easier to ask for forgiveness than permission.

Aerospike is faster than MongoDB, and scales easier, but MongoDB does have a lot of convenient features (and it’s been around for a long time). We keep adding features, but the functionality needs to outweigh any performance penalty a new feature may impose. Just a note - thousands of transactions per-second is nothing for the server.

The counters use case you brought up is one we’ve encountered before from other developers, so we’ll take it into account. It may make sense to provide a client-side helper for counters.


#10

I’d like to work through that with you. Let me know if you want to pursue it. Again, we’re working with transaction volumes outside of Mongo’s capability, though yes it has more features.


#11

Absolutely, let me explain our potential use case more - I think it would help more than trying to pick apart Aerospike feature by feature.

We want to store time series data with lots of “tags” - arbitrary key-value pairs to help us find the values we’re interested in. We are currently using KairosDB with Cassandra which we know we are eventually going to outgrow it because if you get a large row-key-index performance starts to degrade - and ours is growing steadily. It’s growing fast for us because we have a high number of tags and their associated values.

These time series metrics can be modeled as a dictionary nicely:

doc_to_insert = {
  'timestamp': 123456000
  'value': 3
  'client': 'abcd',
  'metric': 'widget_viewed',
  'tags': {
     'widget_id': 'xyz',
     'status_code': 204,
     'diagnostic_mode': 'False'
  }
}

What we need is a way to increment the value field when the timestamp, client, metric, and all tags match an existing record in the databases. OR we have to make a new one if it doesn’t exist yet.

Looking through Aerospike’s modeling, I’m not sure yet how we’d model this. I guess we’d need globally unique keys which are hashes of the set of things that make each record unique, but hopefully there’s an easier/better way to do this?


#12

I’ll get back to you with a more detailed answer soon. Right now I’m looking in the C client whether we can have the increment also create the key atomically if it doesn’t exist. It may be an artificial limitation on the clients wrapping around it, such as Python.


#13

Alright, the Python and PHP clients do not behave correctly for increment(). It should create the record if none exists, then set the bin, similar to how append() behaves.

Therefore: https://github.com/aerospike/aerospike-client-python/issues/41

We’ll get that fixed soon.


#14

That’s awesome, thank you for filing the bug.

Any ideas on how best to use aerospike for our “counters with tags” problem? Or maybe aerospike isn’t a good fit for such a ‘search heavy’ and unbounded problem domain?


#15

Hey Joshua, it’s implemented in the new Python client, release 1.0.38.

Regarding the counters scenario you mentioned, as we don’t yet support compound predicates (AND), I suggest you simply synthesize the data into the primary key for the counter record.

Let’s assume this is a namespace ‘test’, set ‘tagged_counters’ and referring to the dictionary you gave as an example, the primary key would be the string

timestamp-client-metric-tagval-…

For example:

pk = "123456000-abcd-widget_viewed-widget_id|xyz-status_code|204-diagnostic_mode|False"
client.increment(('test','tagged_counters',pk), 3)

You can literally make a tuple with that, since the size of the key is not an issue. In Aerospike the record is actually located by the RIPEMD-160 digest of its key. Similarly you can write your own hashing function that given the key-value ingredients gives a consistently formed string, then hashes it. The hash can then be the ‘primary key’ component of the key tuple for the counter.

Let me know if the modified increment() works as expected.


#16

Awesome about the python client, I’ll try it right away!

As for the schema, that’s about the same route I was going to take, but I wasn’t sure about the size of the key getting too big. We have 10 tags right now but that could grow quite a bit.

I was wondering if we could actually put everything but the timestamp in the primary key, but then store a large data type with the timstamp as a sub-key to the final value… but if you’re confident we can have trillions of keys then it should be fine to include the timestamp in the primary key and just store a counter.


#17

Please let me know if you find any problems with the Python client (hopefully not!).

A record’s key tuple can be any size, it gets hashed by the digest to 20 bytes. In the primary index the record takes a set 64 bytes (the above-mentioned 20 byte digest and space for metadata). This lets you calculate how much DRAM you’ll need for the index (50M records take 3.2GB, for example). Is it really ‘trillions’? So DRAM capacity for the index is one per-node limitation to consider. The other potential problem is that this approach wastes a lot of space. You will have one integer bin (as far as I can see) and the record has a minimal size allocated of 128K on the SSD. This may not be an issue for you, though.

The alternative you’ve mentioned, where timestamp isn’t actually part of the key, can have two or more setups. In the first case an integer bin counter is created per-timestamp. This seems more efficient, but two things may happen. One, too many counter bins spill out of the record size limitation (which is configurable up to 1MB, but you should evaluate your use case). The other, it hits the 32K unique bin names per-namespace.

The second setup was the one you’ve mentioned where you use an LList, with each timestamp as an element. This is slower to update as a counter, but can grow very large without problems.

Sorry that I can’t give a more conclusive answer. I want you to be aware of the data model, the storage layer, and the limitations of the system. You can do the math and see which approach makes more sense for your application, or come up with a more fitting one. Please share your insights if you do!


#18

Yeah, it’s a huge space. Consider we store these metrics at 30 second resolution.

We have 3 metric types, each with up to 100 tags each with up to 1000 values.

We want to support those tagged metrics for N ‘brands’, where N is currently 10 but we expect it to grow to 10s of thousands, if not hundreds.

That conservatively gives us 10,000 * 3 * 100 * 1000 (3,000,000,000) for each time bucket, of which there are 1,051,200 each year. So… yeah… modeling this problem is proving difficult… :slight_smile:


#19

How long do you need to keep them around? Can you rely on TTL and eviction to keep your active data reasonable? I assume after a certain amount of time a counter is no longer going to be updated, if it’s time based.


#20

Rollups are a big part of the plan. We actually don’t “update” documents for much longer than then 30s window that they’re “live”, but sometimes we have backups which necessitate us to update metrics up to a few days old. Once it’s been a week, or so, we rarely if ever would update them further.


#21

Alrighty. Then I would suggest you also repeat the timestamp as an extra bin and build a secondary index on it. This will allow you to query ranges with a BETWEEN predicate for the roll-ups. Anyway, good luck, and let me know if you run into other problems.