Python code slow (10 seconds) vs go code fast (1 second) - help?

Hi all, We are working with python and we want to start using aerospike. With the help of Aerospike we came up with a python code and go code that basically does the same but the python code is much much slower. Is there something we’re missing?

A little explanation about the code: Our code is used for scoring users. So we save a mapping between userId to score. The code takes 2000 timestamps and for each timestamp it creates 10 buckets and in each one it puts X user scores in a map.

I’m adding both the python and go code below. Could anyone please help me understand if

  1. The python code is doing something wrong
  2. The python library just isn’t meant for fast pace?

Thank you very much

python:

import time
import aerospike
import random
from aerospike_helpers.operations import map_operations
import asyncio
from itertools import islice

def timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            print('this function is a coroutine: {}'.format(func.__name__))
            return await func(*args, **params)
        else:
            print('this is not a coroutine')
            return func(*args, **params)

    async def helper(*args, **params):
        print('{}.time'.format(func.__name__))
        start = time.time()
        result = await process(func, *args, **params)

        # Test normal function route...
        # result = await process(lambda *a, **p: print(*a, **p), *args, **params)

        print('>>>', time.time() - start)
        return result

    return helper

def chunkList(elements, chunkSize):
    chunks = []

    if elements is not None:
        elementsIter = iter(elements)
        for i in range(0, len(elements), chunkSize):
            chunks.append(list(islice(elementsIter, chunkSize)))

    return chunks



async def foo(client, bucketKey, binName, values):
    client.map_put_items(bucketKey, binName, values)

@timeit
async def performActions(client, bucketKeys, binName, bucketToValues):
    await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])

async def main():
    config = {
    "hosts": [
        ( "localhost", 3000 )
    ],
    'policies': {
        'total_timeout': 600000
    }
    }
    client = aerospike.client(config)
    client.connect()

    namespace = "app"
    setName = "bago_scores"
    amountOfBuckets = 100
    amountOfLeadsPerBucket = 10
    amountOfTimestamps = 2000
    getChunkSize = 1000
    timestamps = [i for i in range(amountOfTimestamps)]
    binName = "externalIdScore"
    useIntScores = False

    bucketKeys = []

    bucketToValues = {}

    addTime = 0.0
    for timestamp in timestamps:
        for bucket in range(amountOfBuckets):
            bucketKey = "{}_{}".format(bucket, timestamp)
            bucketToValues[bucketKey] = {}
            bucketKeys.append((namespace, setName, bucketKey))
            if useIntScores:
                for index in range(amountOfLeadsPerBucket):
                    bucketToValues[bucketKey]["lead{}".format(index)] = int(random.uniform(0,1) * 100000)
            else:
                for index in range(amountOfLeadsPerBucket):
                    bucketToValues[bucketKey]["lead{}".format(index)] = random.uniform(0,1)

        
        
    await performActions(client, bucketKeys, binName, bucketToValues)
    #await asyncio.gather(*[foo(client, bucketKey, binName, bucketToValues[bucketKey[2]]) for bucketKey in bucketKeys])
        #for bucketKey in bucketKeys:
            #client.map_put_items(bucketKey, bin, bucketToValues[bucketKey[2]])
            #(key, meta, bins) = client.operate(bucketKey, [map_operations.map_put_items(binName, bucketToValues[bucketKey[2]])])
        #timestampAddTime =   time.time() - start
        #print("timestamp {} add took {}".format(timestamp, timestampAddTime))
        #addTime += timestampAddTime

    chunks = chunkList(bucketKeys, getChunkSize)
    print("Getting {} keys in {} chunks of {}".format(len(bucketKeys), len(chunks), getChunkSize))

    start = time.time()

    records = []
    for chunk in chunks:
        records.extend(client.get_many(chunk))
    
    externalIdScores = []

    for record in records:
        externalIdScores.extend(record[2]["externalIdScore"].items())
    print("get took: {}".format(time.time() - start))

    start = time.time()
    res = sorted(externalIdScores, key=lambda x: x[1], reverse=True)
    print("sort took: {}".format(time.time() - start))

    print(res[:50])

asyncio.run(main())

Go:

package main

import (
        "encoding/binary"
        "fmt"
        "log"
        "math/rand"
        "strconv"
        "time"

        aero "github.com/aerospike/aerospike-client-go"
        "github.com/zenthangplus/goccm"
        "golang.org/x/crypto/ripemd160"
)

const (
        // csv-realted
        namespace = "app"
        setName   = "bucketScores"

        bucketNumber = 10
        // BinMap field names
        userIDBin      = "UserIDBin"
        setNameBin     = "set_name"
        ccIDBin        = "ID"
        amountBin      = "AmountBin"
        classBin       = "ClassBin"
        timeBin        = "TimeBin"
        leaderBoardBin = "externalIdScore"

        //asSeedNode = "172.17.0.3"
        asSeedNode = "127.0.0.1"
)

var aeroClient *aero.Client
var c goccm.ConcurrencyManager

func main() {
        var err error

        aeroClient1, err := aero.NewClient(asSeedNode, 3000)
        if err != nil {
                log.Fatal(err)
                return
        }
        aeroClient = aeroClient1

        c = goccm.New(10)

        start := time.Now()
        genLeaderBoard(aeroClient, "aa")
        elapsed := time.Since(start)
        log.Printf("init took %s", elapsed)

        start = time.Now()
        handleEvent(aeroClient, "aa", "123", 1)
        elapsed = time.Since(start)
        log.Printf("update score to 1 lead took %s", elapsed)
/*
        start = time.Now()
        handleEvent(aeroClient, "aa", "125", 1)
        elapsed = time.Since(start)
        log.Printf("Operate2 took %s", elapsed)

        start = time.Now()
        handleEvent(aeroClient, "aa", "126", 1)
        elapsed = time.Since(start)
        log.Printf("Operate3 took %s", elapsed)

        start = time.Now()
        handleEvent(aeroClient, "aa", "128", 1)
        elapsed = time.Since(start)
        log.Printf("Operate4 took %s", elapsed)
*/
        c.WaitAllDone()

//      getTopLeads(aeroClient, "aa", "20200809", 128, 100)

}

func getBucketNumber(leadID string) (leadBucket uint16) {
        x := ripemd160.New()
        x.Write([]byte(leadID))
        l := x.Sum(nil)
        leadBucket = binary.BigEndian.Uint16(l) % bucketNumber
        return leadBucket
}

func handleEvent(client *aero.Client, custID string, leadID string, newScore int) (err error) {

        amap := make(map[interface{}]interface{})
        amap[leadID] = newScore

        //      fmt.Println(leadID)

        leadBucket := getBucketNumber(leadID)

        for i := 0; i < 2000; i++ {
                c.Wait()
                go handleTimeSlice(client, custID, "20200809", i, leadBucket, amap)
        }
        return err
}

func handleTimeSlice(client *aero.Client, custID string, date string, slice int, bucket uint16, amap map[interface{}]interface{}) (err error) {
        //fmt.Println("worker " + strconv.Itoa(slice))

        leaderBoardKey := custID + ":" + date + ":" + strconv.Itoa(slice) + ":" + strconv.Itoa(int(bucket))
        key, err := aero.NewKey(namespace, setName, leaderBoardKey)
        if err != nil {
                log.Println(err)
                return err
        }

        _, err = client.Operate(nil, key,
                aero.MapPutItemsOp(aero.NewMapPolicy(aero.MapOrder.KEY_ORDERED, aero.MapWriteMode.UPDATE), leaderBoardBin, amap),
        )
        if err != nil {
                log.Println(err)
                return err
        }

        c.Done()
        return nil
}

func genLeaderBoard(client *aero.Client, custID string) (err error) {

        for i := 0; i < 2000; i++ {
                c.Wait()
                go genLeaderBoardSlice(client, custID, "20200809", i)
        }
        return err
}

func genLeaderBoardSlice(client *aero.Client, custID string, date string, slice int) (err error) {
        //fmt.Println("worker " + strconv.Itoa(slice))

        fullamap := make(map[uint16]map[interface{}]interface{})

        for i := 0; i < 1000; i++ {
                k := strconv.Itoa(i)
                j := getBucketNumber(k)

                amap, ok := fullamap[j]
                if !ok {
                        amap = make(map[interface{}]interface{})
                }
                amap[k] = rand.Intn(1000)
                fullamap[j] = amap
        }

        pol := client.DefaultWritePolicy
        pol.TotalTimeout = 30 * time.Second
        pol.SendKey = true

        for bucket, parMap := range fullamap {

                leaderBoardKey := custID + ":" + date + ":" + strconv.Itoa(slice) + ":" + strconv.Itoa(int(bucket))
                key, err := aero.NewKey(namespace, setName, leaderBoardKey)
                if err != nil {
                        log.Println(err)
                        return err
                }

                start := time.Now()
                _, err = client.Operate(pol, key,
                        aero.MapPutItemsOp(aero.NewMapPolicy(aero.MapOrder.KEY_ORDERED, aero.MapWriteMode.UPDATE), leaderBoardBin, parMap),
                )
                if err != nil {
                        log.Println("wtf")
                        log.Println(err)
                        elapsed := time.Since(start)
                        log.Printf("Operate %d took %s and failed", slice, elapsed)
                        return err
                }

                //elapsed := time.Since(start)
                //log.Printf("Operate %d took %s", slice, elapsed)
        }
        defer c.Done()
        return err
}

func getTopLeads(client *aero.Client, custID string, date string, slice int, topN int) (err error) {

        keys := make([]*aero.Key, bucketNumber)
        for i := 0; i < bucketNumber; i++ {
                keys[i], _ = aero.NewKey(namespace, setName, custID+":"+date+":"+strconv.Itoa(slice)+":"+strconv.Itoa(i))
        }

        records, err := client.BatchGet(nil, keys, leaderBoardBin)
        for i := 0; i < len(records); i++ {
                key := keys[i]
                record := records[i]
                var value interface{}

                //amap := make(map[interface{}]interface{})

                if record != nil {
                        value = record.Bins[leaderBoardBin]
                        //amap := record.Bins[leaderBoardBin].(map[interface{}]interface{})

                }
                fmt.Println("Key:", key.Value())
                fmt.Println("Value: ", value)
                fmt.Println(topN)
        }

        return err
}

Please check a similar thread about using asynio package with the Aerospike Python client: Bucketing / splitting data. As pointed out there, the recommended solution today is to use multiple threads for optimal performance. While we don’t have guidance for using asyncio with the existing Python client library, we have it on our radar to support async io in the future. In the meanwhile, we would like to learn any insights you may gain from your experimentation in this area.

© 2015 Copyright Aerospike, Inc. | All rights reserved. Creators of the Aerospike Database.