I am inserting over 400 million timeseries “ticks” into Aerospike in a single batch using the Python client. Each record is keyed using series identifier and hour, and contains a single bin, and said bin is an ordered map consisting of epoch times and values for each tick in that ticker:hour combination. There are about 1200 unique series identifiers, and anywhere between 1000 and 600k ticks per series (as abovementioned, they’re grouped by hour into ordered maps).
I am finding that my insertion code in Python is taking up a growing amount of memory, as can be seen here on htop:
So far I have inserted only about 8% of the records, so about 34 million, and the python upload routine is already using more than 9% of 32GB so about 3GB. I am tracking the hours inserted using a Python dict but this is only using up about 1 megabyte of RAM, so I can only assume the aerospike Python driver is responsible for the rest. I already tried this overnight and my 32GB machine killed the python process due to overuse of RAM (35GB for the python alone).
As you can see on AMC, the data itself is only using 280M of RAM so far, versus 3gigabyte for the insertion code (bbaeromap.py).
Here is my code:
import aerospike
import csv
import pandas as pd
from dateutil import parser
from datetime import datetime
import pdb
import sys
sys.path.append("../../pytools")
from sys import getsizeof, stderr
from ansi import ansi
import IPython
from itertools import chain
from collections import deque
try:
from reprlib import repr
except ImportError:
pass
def total_size(o, handlers={}, verbose=False):
""" Returns the approximate memory footprint an object and all of its contents.
Automatically finds the contents of the following builtin containers and
their subclasses: tuple, list, deque, dict, set and frozenset.
To search other containers, add handlers to iterate over their contents:
handlers = {SomeContainerClass: iter,
OtherContainerClass: OtherContainerClass.get_elements}
"""
dict_handler = lambda d: chain.from_iterable(d.items())
all_handlers = {tuple: iter,
list: iter,
deque: iter,
dict: dict_handler,
set: iter,
frozenset: iter,
}
all_handlers.update(handlers) # user handlers take precedence
seen = set() # track which object id's have already been seen
default_size = getsizeof(0) # estimate sizeof object without __sizeof__
def sizeof(o):
if id(o) in seen: # do not double count the same object
return 0
seen.add(id(o))
s = getsizeof(o, default_size)
if verbose:
print(s, type(o), repr(o), file=stderr)
for typ, handler in all_handlers.items():
if isinstance(o, typ):
s += sum(map(sizeof, handler(o)))
break
return s
return sizeof(o)
if __name__ == '__main__':
d = dict(a=1, b=2, c=3, d=[4,5,6,7], e='a string of chars')
print(total_size(d, verbose=True))
if __name__ == "__main__":
config = {
'hosts': [ ('192.168.1.122', 3000) ]
}
client = aerospike.client(config).connect()
map_policy={'map_order':aerospike.MAP_KEY_VALUE_ORDERED}
with open("bbfast.csv") as csvfile:
reader = csv.reader(csvfile, delimiter = ",")
next(reader, None)
counter = 0
timer = datetime.utcnow()
trackerset = set()
for row in reader:
ticker = row[0]
btime = parser.parse(row[1])
value = float(row[2])
unixtime = int(pd.Timestamp(btime).value/1e9)
datehourstring = btime.strftime("%Y%m%d:%H")
td = ticker+":"+datehourstring
key = ("mem", "bbfast", td)
if td not in trackerset:
client.map_set_policy(key, "ticks", map_policy)
trackerset.add(td)
client.map_put(key, "ticks", unixtime, value)
counter += 1
if (datetime.utcnow() - timer).seconds > 0:
print()
ansi("yellow")
print(counter, datetime.utcnow(), len(trackerset), total_size(trackerset), end = " ")
ansi()
timer = datetime.utcnow()
And the csv file looks something like this (here is the 50-line head. There are some duplicates):
ticker,time,value
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:47:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:13.000Z,0.00081
SX7P Index,2016-11-11T15:48:15.000Z,156.89
SX7P Index,2016-11-11T15:48:15.000Z,156.89
SX7P Index,2016-11-11T15:48:30.000Z,156.88
SX7P Index,2016-11-11T15:48:30.000Z,156.88
USDXAU CMPL Curncy,2016-11-11T15:48:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:48:43.000Z,0.00081
SX7P Index,2016-11-11T15:48:45.000Z,156.89
SX7P Index,2016-11-11T15:48:45.000Z,156.89
SX7P Index,2016-11-11T15:49:00.000Z,156.88
USDXAU CMPL Curncy,2016-11-11T15:49:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:49:13.000Z,0.00081
SX7P Index,2016-11-11T15:49:15.000Z,156.85
SX7P Index,2016-11-11T15:49:15.000Z,156.85
SX7P Index,2016-11-11T15:49:30.000Z,156.82
SX7P Index,2016-11-11T15:49:30.000Z,156.82
USDXAU CMPL Curncy,2016-11-11T15:49:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:49:43.000Z,0.00081
SX7P Index,2016-11-11T15:49:45.000Z,156.75
SX7P Index,2016-11-11T15:49:45.000Z,156.75
SX7P Index,2016-11-11T15:50:00.000Z,156.68
USDXAU CMPL Curncy,2016-11-11T15:50:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:50:13.000Z,0.00081
SX7P Index,2016-11-11T15:50:15.000Z,156.66
SX7P Index,2016-11-11T15:50:15.000Z,156.66
SX7P Index,2016-11-11T15:50:30.000Z,156.6
SX7P Index,2016-11-11T15:50:30.000Z,156.6
USDXAU CMPL Curncy,2016-11-11T15:50:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:50:43.000Z,0.00081
SX7P Index,2016-11-11T15:50:45.000Z,156.55
SX7P Index,2016-11-11T15:50:45.000Z,156.55
SX7P Index,2016-11-11T15:51:00.000Z,156.52
USDXAU CMPL Curncy,2016-11-11T15:51:12.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:12.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:13.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:13.000Z,0.00081
SX7P Index,2016-11-11T15:51:15.000Z,156.52
SX7P Index,2016-11-11T15:51:15.000Z,156.52
SX7P Index,2016-11-11T15:51:30.000Z,156.48
SX7P Index,2016-11-11T15:51:30.000Z,156.48
USDXAU CMPL Curncy,2016-11-11T15:51:43.000Z,0.00081
USDXAU CMPL Curncy,2016-11-11T15:51:43.000Z,0.00081
SX7P Index,2016-11-11T15:51:45.000Z,156.51
SX7P Index,2016-11-11T15:51:45.000Z,156.51
SX7P Index,2016-11-11T15:52:00.000Z,156.55
Is there any reason why the Aerospike driver should progressively be using so much RAM? Is my code to blame? Is there a way to “flush” it?