Large memory usage by Python driver on big batch insert`

I am inserting over 400 million timeseries “ticks” into Aerospike in a single batch using the Python client. Each record is keyed using series identifier and hour, and contains a single bin, and said bin is an ordered map consisting of epoch times and values for each tick in that ticker:hour combination. There are about 1200 unique series identifiers, and anywhere between 1000 and 600k ticks per series (as abovementioned, they’re grouped by hour into ordered maps).

I am finding that my insertion code in Python is taking up a growing amount of memory, as can be seen here on htop:

So far I have inserted only about 8% of the records, so about 34 million, and the python upload routine is already using more than 9% of 32GB so about 3GB. I am tracking the hours inserted using a Python dict but this is only using up about 1 megabyte of RAM, so I can only assume the aerospike Python driver is responsible for the rest. I already tried this overnight and my 32GB machine killed the python process due to overuse of RAM (35GB for the python alone).

As you can see on AMC, the data itself is only using 280M of RAM so far, versus 3gigabyte for the insertion code (bbaeromap.py).

Here is my code:

    import aerospike
    import csv
    import pandas as pd
    from dateutil import parser
    from datetime import datetime
    import pdb
    import sys
    sys.path.append("../../pytools")
    from sys import getsizeof, stderr
    from ansi import ansi
    import IPython
    from itertools import chain
    from collections import deque

    try:
        from reprlib import repr
    except ImportError:
        pass

    def total_size(o, handlers={}, verbose=False):
        """ Returns the approximate memory footprint an object and all of its contents.

        Automatically finds the contents of the following builtin containers and
        their subclasses:  tuple, list, deque, dict, set and frozenset.
        To search other containers, add handlers to iterate over their contents:

            handlers = {SomeContainerClass: iter,
                        OtherContainerClass: OtherContainerClass.get_elements}

        """
        dict_handler = lambda d: chain.from_iterable(d.items())
        all_handlers = {tuple: iter,
                        list: iter,
                        deque: iter,
                        dict: dict_handler,
                        set: iter,
                        frozenset: iter,
                       }
        all_handlers.update(handlers)     # user handlers take precedence
        seen = set()                      # track which object id's have already been seen
        default_size = getsizeof(0)       # estimate sizeof object without __sizeof__

        def sizeof(o):
            if id(o) in seen:       # do not double count the same object
                return 0
            seen.add(id(o))
            s = getsizeof(o, default_size)

            if verbose:
                print(s, type(o), repr(o), file=stderr)

            for typ, handler in all_handlers.items():
                if isinstance(o, typ):
                    s += sum(map(sizeof, handler(o)))
                    break
            return s

        return sizeof(o)



    if __name__ == '__main__':
        d = dict(a=1, b=2, c=3, d=[4,5,6,7], e='a string of chars')
        print(total_size(d, verbose=True))

    if __name__ == "__main__":
        config = {
            'hosts': [ ('192.168.1.122', 3000) ]
        }
        client = aerospike.client(config).connect()
        map_policy={'map_order':aerospike.MAP_KEY_VALUE_ORDERED}
        with open("bbfast.csv") as csvfile:
            reader = csv.reader(csvfile, delimiter = ",")
            next(reader, None)
            counter = 0
            timer = datetime.utcnow()
            trackerset = set()
            for row in reader:
                ticker = row[0]
                btime = parser.parse(row[1])
                value = float(row[2])
                unixtime = int(pd.Timestamp(btime).value/1e9)
                datehourstring = btime.strftime("%Y%m%d:%H")
                td = ticker+":"+datehourstring
                key = ("mem", "bbfast", td)
                if td not in trackerset:
                    client.map_set_policy(key, "ticks", map_policy)
                    trackerset.add(td)
                client.map_put(key, "ticks", unixtime, value)
                counter += 1
                if (datetime.utcnow() - timer).seconds > 0:
                    print()
                    ansi("yellow")
                    print(counter, datetime.utcnow(), len(trackerset), total_size(trackerset), end = " ")
                    ansi()
                    timer = datetime.utcnow()

And the csv file looks something like this (here is the 50-line head. There are some duplicates):

ticker,time,value
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:13.000Z,0.00081
SX7P Index,2016-11-11T15:48:15.000Z,156.89
SX7P Index,2016-11-11T15:48:15.000Z,156.89
SX7P Index,2016-11-11T15:48:30.000Z,156.88
SX7P Index,2016-11-11T15:48:30.000Z,156.88
USDXAU CMPL Curncy,2016-11-11T15:48:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:43.000Z,0.00081
SX7P Index,2016-11-11T15:48:45.000Z,156.89
SX7P Index,2016-11-11T15:48:45.000Z,156.89
SX7P Index,2016-11-11T15:49:00.000Z,156.88
USDXAU CMPL Curncy,2016-11-11T15:49:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:49:13.000Z,0.00081
SX7P Index,2016-11-11T15:49:15.000Z,156.85
SX7P Index,2016-11-11T15:49:15.000Z,156.85
SX7P Index,2016-11-11T15:49:30.000Z,156.82
SX7P Index,2016-11-11T15:49:30.000Z,156.82
USDXAU CMPL Curncy,2016-11-11T15:49:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:49:43.000Z,0.00081
SX7P Index,2016-11-11T15:49:45.000Z,156.75
SX7P Index,2016-11-11T15:49:45.000Z,156.75
SX7P Index,2016-11-11T15:50:00.000Z,156.68
USDXAU CMPL Curncy,2016-11-11T15:50:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:50:13.000Z,0.00081
SX7P Index,2016-11-11T15:50:15.000Z,156.66
SX7P Index,2016-11-11T15:50:15.000Z,156.66
SX7P Index,2016-11-11T15:50:30.000Z,156.6
SX7P Index,2016-11-11T15:50:30.000Z,156.6
USDXAU CMPL Curncy,2016-11-11T15:50:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:50:43.000Z,0.00081
SX7P Index,2016-11-11T15:50:45.000Z,156.55
SX7P Index,2016-11-11T15:50:45.000Z,156.55
SX7P Index,2016-11-11T15:51:00.000Z,156.52
USDXAU CMPL Curncy,2016-11-11T15:51:12.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:12.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:13.000Z,0.00081
SX7P Index,2016-11-11T15:51:15.000Z,156.52
SX7P Index,2016-11-11T15:51:15.000Z,156.52
SX7P Index,2016-11-11T15:51:30.000Z,156.48
SX7P Index,2016-11-11T15:51:30.000Z,156.48
USDXAU CMPL Curncy,2016-11-11T15:51:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:43.000Z,0.00081
SX7P Index,2016-11-11T15:51:45.000Z,156.51
SX7P Index,2016-11-11T15:51:45.000Z,156.51
SX7P Index,2016-11-11T15:52:00.000Z,156.55

Is there any reason why the Aerospike driver should progressively be using so much RAM? Is my code to blame? Is there a way to “flush” it?

On the face of it that seems like a memory leak. This is a well written bug report, but should be in the issues of the aerospike/aerospike-client-python repo.

Can you open a new issue there and link it back to this post?

I will code up a simpler version of the python routine that replicates the problem without needing the CSV file, post it on my github, and then post the issue in the Aerospike repo as you say. Please give me 24 hours.

Okay I have raised an issue, which also includes some simplified code:

This was an issue with Python3 only. Ver 2.0.8 released, fixes memory leak in python3. I tested the above code with Python 2 up to 25 million inserts. I did not see the 10x increase in total memory. The memory consumed by the application code stays stable, while memory consumed by the server grows with the data, almost one to one.

Summary of test on Ubuntu 16.04 VM, running CE 3.11.0.1 1 - Boot up VM from shutdown, open terminal, start htop: Tot mem: 663M 2 - Start second terminal, launch aerospike single node server: 811M 3 - Start AMC: 838M 4 - Start firefox, launch AMC dashboard: 1.08G total, 0 Mb server 5 - Start above code (memleaktest.py) : 1.09G / 0Mb 6 - All 720K keys inserted - first code loop: 1.16G/62.9M 7 - 5 mill insert updates to maps: 1.36G/142M 8 - 10 mill insert updates to maps: 1.45G/224M (Steps 1 -8 were repeated twice - shutdown, reboot VM, similar data) 9 - 20 mill insert updates to maps: 1.63G/385M memleaktest.py mem usage remains at 1.2% to total memory throughout.

10 - 25 million insert updates to maps: 1.71G/466M

See screenshots on github issues page linked in previous post for both python2 and python3 performance.

Fixed in ver 2.0.8

1 Like

got it thanks Piyush.