This is a code we use to test, we ended up reaching to asyncio to try and execute multiple queries and not wait for response.
import time
import aerospike
import random
from aerospike_helpers.operations import map_operations
import asyncio
from itertools import islice
def timeit(func):
async def process(func, *args, **params):
if asyncio.iscoroutinefunction(func):
print('this function is a coroutine: {}'.format(func.__name__))
return await func(*args, **params)
else:
print('this is not a coroutine')
return func(*args, **params)
async def helper(*args, **params):
print('{}.time'.format(func.__name__))
start = time.time()
result = await process(func, *args, **params)
# Test normal function route...
# result = await process(lambda *a, **p: print(*a, **p), *args, **params)
print('>>>', time.time() - start)
return result
return helper
def chunkList(elements, chunkSize):
chunks = []
if elements is not None:
elementsIter = iter(elements)
for i in range(0, len(elements), chunkSize):
chunks.append(list(islice(elementsIter, chunkSize)))
return chunks
async def foo(client, bucketKey, binName, values):
client.map_put_items(bucketKey, binName, values)
@timeit
async def performActions(client, bucketKeys, binName, bucketToValues):
await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
async def main():
config = {
"hosts": [
( "localhost", 3000 )
],
'policies': {
'total_timeout': 600000
}
}
client = aerospike.client(config)
client.connect()
namespace = "app"
setName = "bago_scores"
amountOfBuckets = 10
amountOfLeadsPerBucket = 10
amountOfTimestamps = 2175
getChunkSize = 1000
timestamps = [i for i in range(amountOfTimestamps)]
binName = "externalIdScore"
useIntScores = False
bucketKeys = []
bucketToValues = {}
addTime = 0.0
for timestamp in timestamps:
for bucket in range(amountOfBuckets):
bucketKey = "{}_{}".format(bucket, timestamp)
bucketToValues[bucketKey] = {}
bucketKeys.append((namespace, setName, bucketKey))
if useIntScores:
for index in range(amountOfLeadsPerBucket):
bucketToValues[bucketKey]["lead{}".format(index)] = int(random.uniform(0,1) * 100000)
else:
for index in range(amountOfLeadsPerBucket):
bucketToValues[bucketKey]["lead{}".format(index)] = random.uniform(0,1)
await performActions(client, bucketKeys, binName, bucketToValues)
#await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
#for bucketKey in bucketKeys:
#client.map_put_items(bucketKey, bin, bucketToValues[bucketKey[2]])
#(key, meta, bins) = client.operate(bucketKey, [map_operations.map_put_items(binName, bucketToValues[bucketKey[2]])])
#timestampAddTime = time.time() - start
#print("timestamp {} add took {}".format(timestamp, timestampAddTime))
#addTime += timestampAddTime
chunks = chunkList(bucketKeys, getChunkSize)
print("Getting {} keys in {} chunks of {}".format(len(bucketKeys), len(chunks), getChunkSize))
start = time.time()
records = []
for chunk in chunks:
records.extend(client.get_many(chunk))
externalIdScores = []
for record in records:
externalIdScores.extend(record[2]["externalIdScore"].items())
print("get took: {}".format(time.time() - start))
start = time.time()
res = sorted(externalIdScores, key=lambda x: x[1], reverse=True)
print("sort took: {}".format(time.time() - start))
print(res[:50])
asyncio.run(main())