Hi all, We are working with python and we want to start using aerospike. With the help of Aerospike we came up with a python code and go code that basically does the same but the python code is much much slower. Is there something we’re missing?
A little explanation about the code: Our code is used for scoring users. So we save a mapping between userId to score. The code takes 2000 timestamps and for each timestamp it creates 10 buckets and in each one it puts X user scores in a map.
I’m adding both the python and go code below. Could anyone please help me understand if
- The python code is doing something wrong
- The python library just isn’t meant for fast pace?
Thank you very much
python:
import time
import aerospike
import random
from aerospike_helpers.operations import map_operations
import asyncio
from itertools import islice
def timeit(func):
async def process(func, *args, **params):
if asyncio.iscoroutinefunction(func):
print('this function is a coroutine: {}'.format(func.__name__))
return await func(*args, **params)
else:
print('this is not a coroutine')
return func(*args, **params)
async def helper(*args, **params):
print('{}.time'.format(func.__name__))
start = time.time()
result = await process(func, *args, **params)
# Test normal function route...
# result = await process(lambda *a, **p: print(*a, **p), *args, **params)
print('>>>', time.time() - start)
return result
return helper
def chunkList(elements, chunkSize):
chunks = []
if elements is not None:
elementsIter = iter(elements)
for i in range(0, len(elements), chunkSize):
chunks.append(list(islice(elementsIter, chunkSize)))
return chunks
async def foo(client, bucketKey, binName, values):
client.map_put_items(bucketKey, binName, values)
@timeit
async def performActions(client, bucketKeys, binName, bucketToValues):
await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
async def main():
config = {
"hosts": [
( "localhost", 3000 )
],
'policies': {
'total_timeout': 600000
}
}
client = aerospike.client(config)
client.connect()
namespace = "app"
setName = "bago_scores"
amountOfBuckets = 100
amountOfLeadsPerBucket = 10
amountOfTimestamps = 2000
getChunkSize = 1000
timestamps = [i for i in range(amountOfTimestamps)]
binName = "externalIdScore"
useIntScores = False
bucketKeys = []
bucketToValues = {}
addTime = 0.0
for timestamp in timestamps:
for bucket in range(amountOfBuckets):
bucketKey = "{}_{}".format(bucket, timestamp)
bucketToValues[bucketKey] = {}
bucketKeys.append((namespace, setName, bucketKey))
if useIntScores:
for index in range(amountOfLeadsPerBucket):
bucketToValues[bucketKey]["lead{}".format(index)] = int(random.uniform(0,1) * 100000)
else:
for index in range(amountOfLeadsPerBucket):
bucketToValues[bucketKey]["lead{}".format(index)] = random.uniform(0,1)
await performActions(client, bucketKeys, binName, bucketToValues)
#await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
#for bucketKey in bucketKeys:
#client.map_put_items(bucketKey, bin, bucketToValues[bucketKey[2]])
#(key, meta, bins) = client.operate(bucketKey, [map_operations.map_put_items(binName, bucketToValues[bucketKey[2]])])
#timestampAddTime = time.time() - start
#print("timestamp {} add took {}".format(timestamp, timestampAddTime))
#addTime += timestampAddTime
chunks = chunkList(bucketKeys, getChunkSize)
print("Getting {} keys in {} chunks of {}".format(len(bucketKeys), len(chunks), getChunkSize))
start = time.time()
records = []
for chunk in chunks:
records.extend(client.get_many(chunk))
externalIdScores = []
for record in records:
externalIdScores.extend(record[2]["externalIdScore"].items())
print("get took: {}".format(time.time() - start))
start = time.time()
res = sorted(externalIdScores, key=lambda x: x[1], reverse=True)
print("sort took: {}".format(time.time() - start))
print(res[:50])
asyncio.run(main())
Go:
package main
import (
"encoding/binary"
"fmt"
"log"
"math/rand"
"strconv"
"time"
aero "github.com/aerospike/aerospike-client-go"
"github.com/zenthangplus/goccm"
"golang.org/x/crypto/ripemd160"
)
const (
// csv-realted
namespace = "app"
setName = "bucketScores"
bucketNumber = 10
// BinMap field names
userIDBin = "UserIDBin"
setNameBin = "set_name"
ccIDBin = "ID"
amountBin = "AmountBin"
classBin = "ClassBin"
timeBin = "TimeBin"
leaderBoardBin = "externalIdScore"
//asSeedNode = "172.17.0.3"
asSeedNode = "127.0.0.1"
)
var aeroClient *aero.Client
var c goccm.ConcurrencyManager
func main() {
var err error
aeroClient1, err := aero.NewClient(asSeedNode, 3000)
if err != nil {
log.Fatal(err)
return
}
aeroClient = aeroClient1
c = goccm.New(10)
start := time.Now()
genLeaderBoard(aeroClient, "aa")
elapsed := time.Since(start)
log.Printf("init took %s", elapsed)
start = time.Now()
handleEvent(aeroClient, "aa", "123", 1)
elapsed = time.Since(start)
log.Printf("update score to 1 lead took %s", elapsed)
/*
start = time.Now()
handleEvent(aeroClient, "aa", "125", 1)
elapsed = time.Since(start)
log.Printf("Operate2 took %s", elapsed)
start = time.Now()
handleEvent(aeroClient, "aa", "126", 1)
elapsed = time.Since(start)
log.Printf("Operate3 took %s", elapsed)
start = time.Now()
handleEvent(aeroClient, "aa", "128", 1)
elapsed = time.Since(start)
log.Printf("Operate4 took %s", elapsed)
*/
c.WaitAllDone()
// getTopLeads(aeroClient, "aa", "20200809", 128, 100)
}
func getBucketNumber(leadID string) (leadBucket uint16) {
x := ripemd160.New()
x.Write([]byte(leadID))
l := x.Sum(nil)
leadBucket = binary.BigEndian.Uint16(l) % bucketNumber
return leadBucket
}
func handleEvent(client *aero.Client, custID string, leadID string, newScore int) (err error) {
amap := make(map[interface{}]interface{})
amap[leadID] = newScore
// fmt.Println(leadID)
leadBucket := getBucketNumber(leadID)
for i := 0; i < 2000; i++ {
c.Wait()
go handleTimeSlice(client, custID, "20200809", i, leadBucket, amap)
}
return err
}
func handleTimeSlice(client *aero.Client, custID string, date string, slice int, bucket uint16, amap map[interface{}]interface{}) (err error) {
//fmt.Println("worker " + strconv.Itoa(slice))
leaderBoardKey := custID + ":" + date + ":" + strconv.Itoa(slice) + ":" + strconv.Itoa(int(bucket))
key, err := aero.NewKey(namespace, setName, leaderBoardKey)
if err != nil {
log.Println(err)
return err
}
_, err = client.Operate(nil, key,
aero.MapPutItemsOp(aero.NewMapPolicy(aero.MapOrder.KEY_ORDERED, aero.MapWriteMode.UPDATE), leaderBoardBin, amap),
)
if err != nil {
log.Println(err)
return err
}
c.Done()
return nil
}
func genLeaderBoard(client *aero.Client, custID string) (err error) {
for i := 0; i < 2000; i++ {
c.Wait()
go genLeaderBoardSlice(client, custID, "20200809", i)
}
return err
}
func genLeaderBoardSlice(client *aero.Client, custID string, date string, slice int) (err error) {
//fmt.Println("worker " + strconv.Itoa(slice))
fullamap := make(map[uint16]map[interface{}]interface{})
for i := 0; i < 1000; i++ {
k := strconv.Itoa(i)
j := getBucketNumber(k)
amap, ok := fullamap[j]
if !ok {
amap = make(map[interface{}]interface{})
}
amap[k] = rand.Intn(1000)
fullamap[j] = amap
}
pol := client.DefaultWritePolicy
pol.TotalTimeout = 30 * time.Second
pol.SendKey = true
for bucket, parMap := range fullamap {
leaderBoardKey := custID + ":" + date + ":" + strconv.Itoa(slice) + ":" + strconv.Itoa(int(bucket))
key, err := aero.NewKey(namespace, setName, leaderBoardKey)
if err != nil {
log.Println(err)
return err
}
start := time.Now()
_, err = client.Operate(pol, key,
aero.MapPutItemsOp(aero.NewMapPolicy(aero.MapOrder.KEY_ORDERED, aero.MapWriteMode.UPDATE), leaderBoardBin, parMap),
)
if err != nil {
log.Println("wtf")
log.Println(err)
elapsed := time.Since(start)
log.Printf("Operate %d took %s and failed", slice, elapsed)
return err
}
//elapsed := time.Since(start)
//log.Printf("Operate %d took %s", slice, elapsed)
}
defer c.Done()
return err
}
func getTopLeads(client *aero.Client, custID string, date string, slice int, topN int) (err error) {
keys := make([]*aero.Key, bucketNumber)
for i := 0; i < bucketNumber; i++ {
keys[i], _ = aero.NewKey(namespace, setName, custID+":"+date+":"+strconv.Itoa(slice)+":"+strconv.Itoa(i))
}
records, err := client.BatchGet(nil, keys, leaderBoardBin)
for i := 0; i < len(records); i++ {
key := keys[i]
record := records[i]
var value interface{}
//amap := make(map[interface{}]interface{})
if record != nil {
value = record.Bins[leaderBoardBin]
//amap := record.Bins[leaderBoardBin].(map[interface{}]interface{})
}
fmt.Println("Key:", key.Value())
fmt.Println("Value: ", value)
fmt.Println(topN)
}
return err
}