Aerospike learning - writing many keys slow performance

Hi,

We are considering Aerospike and trying to migrate but performance seems very problematic for us at the moment, since Aerospike is considered such a strong db i believe its something that we do wrong.

We use the python client and our workload is as follows an event driven app that receive events and for each events we are generating 2175 Values that needs to be inserted to a map. Because maps have limits of values in them, we are dividing our values to buckets. (currently 10 buckets) so now we got 21750 keys to submit to aerospike.

Aerospike client is limited to 1000 requests per send, so we have to run 22 requests to send.

Eventually , with running the code as async-await, it takes us about 5 seconds! to push data to aerospike cluster of 3 nodes (Running this test script from within 1 of the node server)

We currently used the Memory engine to prevent and issues of deployment and just test this case.

What are we doing wrong ?:slight_smile:

Are you trying to update the same record concurrently? Sounds like you might should split that record up maybe?

Also are you using the add items api or are you doing 2k add_item calls? Add items will be much more efficient.

Could you provide a snippet to demonstrate your update procedure.

This is a code we use to test, we ended up reaching to asyncio to try and execute multiple queries and not wait for response.

import time
import aerospike
import random
from aerospike_helpers.operations import map_operations
import asyncio
from itertools import islice

def timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            print('this function is a coroutine: {}'.format(func.__name__))
            return await func(*args, **params)
        else:
            print('this is not a coroutine')
            return func(*args, **params)

    async def helper(*args, **params):
        print('{}.time'.format(func.__name__))
        start = time.time()
        result = await process(func, *args, **params)

        # Test normal function route...
        # result = await process(lambda *a, **p: print(*a, **p), *args, **params)

        print('>>>', time.time() - start)
        return result

    return helper


def chunkList(elements, chunkSize):
    chunks = []

    if elements is not None:
        elementsIter = iter(elements)
        for i in range(0, len(elements), chunkSize):
            chunks.append(list(islice(elementsIter, chunkSize)))

    return chunks



async def foo(client, bucketKey, binName, values):
    client.map_put_items(bucketKey, binName, values)

@timeit
async def performActions(client, bucketKeys, binName, bucketToValues):
    await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])

async def main():
    config = {
    "hosts": [
        ( "localhost", 3000 )
    ],
    'policies': {
        'total_timeout': 600000
    }
    }
    client = aerospike.client(config)
    client.connect()

    namespace = "app"
    setName = "bago_scores"
    amountOfBuckets = 10
    amountOfLeadsPerBucket = 10
    amountOfTimestamps = 2175
    getChunkSize = 1000
    timestamps = [i for i in range(amountOfTimestamps)]
    binName = "externalIdScore"
    useIntScores = False

    bucketKeys = []

    bucketToValues = {}

    addTime = 0.0
    for timestamp in timestamps:
        for bucket in range(amountOfBuckets):
            bucketKey = "{}_{}".format(bucket, timestamp)
            bucketToValues[bucketKey] = {}
            bucketKeys.append((namespace, setName, bucketKey))
            if useIntScores:
                for index in range(amountOfLeadsPerBucket):
                    bucketToValues[bucketKey]["lead{}".format(index)] = int(random.uniform(0,1) * 100000)
            else:
                for index in range(amountOfLeadsPerBucket):
                    bucketToValues[bucketKey]["lead{}".format(index)] = random.uniform(0,1)

        
        
    await performActions(client, bucketKeys, binName, bucketToValues)
    #await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
        #for bucketKey in bucketKeys:
            #client.map_put_items(bucketKey, bin, bucketToValues[bucketKey[2]])
            #(key, meta, bins) = client.operate(bucketKey, [map_operations.map_put_items(binName, bucketToValues[bucketKey[2]])])
        #timestampAddTime =   time.time() - start
        #print("timestamp {} add took {}".format(timestamp, timestampAddTime))
        #addTime += timestampAddTime

    chunks = chunkList(bucketKeys, getChunkSize)
    print("Getting {} keys in {} chunks of {}".format(len(bucketKeys), len(chunks), getChunkSize))

    start = time.time()

    records = []
    for chunk in chunks:
        records.extend(client.get_many(chunk))
    
    externalIdScores = []

    for record in records:
        externalIdScores.extend(record[2]["externalIdScore"].items())
    print("get took: {}".format(time.time() - start))

    start = time.time()
    res = sorted(externalIdScores, key=lambda x: x[1], reverse=True)
    print("sort took: {}".format(time.time() - start))

    print(res[:50])

asyncio.run(main())
© 2015 Copyright Aerospike, Inc. | All rights reserved. Creators of the Aerospike Database.