I am running an aggregation function on an Aerospike cluster and I’ve noticed that, when the aggregation runs, it’s only using one processor core on each of the nodes. I’ve tried various query settings to force UDF execution to use multiple cores, but I feel like I’m probably doing something fundamentally wrong if it’s only executing on a single CPU core. Is this normal, or is there some configuration setting that I’m missing that allows UDFs to execute on more than one CPU core?
Adding, this is somewhat vague to me – it seems to imply that Lua UDFs only run on a single core but I want to be sure:
Aerospike creates a queue of Lua contexts - no more than on per thread, per registered UDFs - greatly reducing latency and increasing performance.
…
Multicore
Many other interpretive languages can only run one thread of execution per process. CPython, for example, uses globals in its code base, which greatly reduces the ability to run multiple Python contexts per process. Google’s implemention of Javascript - V8 - touts its ability to run multiple cores simultaneously, but by our measurements, performance was very poor compared to Lua.
Source: https://www.aerospike.com/docs/architecture/udf.html
Lua Runs on the multiple cores in parallel and Lua states are cached on server. What configuration are you running with
Can you share output of
asinfo -v 'get-config:'
What query are you running ?
aql> select * from test where bin1 between 1 and 10000
or is it
aql> select * from test
– R
no, it’s:
aggregate library.function() where testid = 1234
Queries themselves appear to run in parallel just fine, it’s the aggregations that seem to only run on a single core. And by that I mean that when I run the statement above, it only executes on a single core per node in my cluster. I can run multiple aggregations in parallel, but I’d like a single aggregation request to use multiple cores to execute.
Edit: here is the asinfo output:
transaction-queues=4; transaction-threads-per-queue=2; transaction-duplicate-threads=4; transaction-pending-limit=20; migrate-threads=4; migrate-xmit-priority=1000; migrate-xmit-sleep=10; migrate-read-priority=1000; migrate-read-sleep=10; migrate-xmit-hwm=1000; migrate-xmit-lwm=5; migrate-max-num-incoming=256; migrate-rx-lifetime-ms=60000; proto-fd-max=15000; proto-fd-idle-ms=60000; transaction-retry-ms=1000; transaction-max-ms=1000; transaction-repeatable-read=false; dump-message-above-size=134217728; ticker-interval=10; microbenchmarks=false; storage-benchmarks=false; ldt-benchmarks=false; scan-priority=200; scan-sleep=1; batch-threads=4; batch-max-requests=5000; batch-priority=200; nsup-delete-sleep=0; nsup-period=120; nsup-startup-evict=true; paxos-retransmit-period=5; paxos-single-replica-limit=1; paxos-max-cluster-size=32; paxos-protocol=v3; paxos-recovery-policy=manual; write-duplicate-resolution-disable=false; respond-client-on-master-completion=false; replication-fire-and-forget=false; info-threads=16; allow-inline-transactions=true; use-queue-per-device=false; snub-nodes=false; fb-health-msg-per-burst=0; fb-health-msg-timeout=200; fb-health-good-pct=50; fb-health-bad-pct=0; auto-dun=false; auto-undun=false; prole-extra-ttl=0; max-msgs-per-type=-1; service-threads=4; fabric-workers=16; pidfile=/var/run/aerospike/asd.pid; memory-accounting=false; udf-runtime-gmax-memory=18446744073709551615; udf-runtime-max-memory=18446744073709551615; sindex-populator-scan-priority=3; sindex-data-max-memory=18446744073709551615; query-threads=4; query-worker-threads=15; query-priority=10; query-in-transaction-thread=0; query-req-in-query-thread=0; query-req-max-inflight=100; query-bufpool-size=256; query-batch-size=100; query-sleep=1; query-job-tracking=false; query-short-q-max-size=500; query-long-q-max-size=500; query-rec-count-bound=4294967295; query-threshold=10; query-untracked-time=1000000; service-address=0.0.0.0; service-port=3000; reuse-address=true; fabric-port=3001; network-info-port=3003; enable-fastpath=true; heartbeat-mode=multicast; heartbeat-protocol=v2; heartbeat-address=239.1.99.222; heartbeat-port=9918; heartbeat-interval=150; heartbeat-timeout=20; enable-security=false; privilege-refresh-period=300; report-authentication-sinks=0; report-data-op-sinks=0; report-sys-admin-sinks=0; report-user-admin-sinks=0; report-violation-sinks=0; syslog-local=-1; xdr-delete-shipping-enabled=true; xdr-nsup-deletes-enabled=false; enable-xdr=false; stop-writes-noxdr=false; reads-hist-track-back=1800; reads-hist-track-slice=10; reads-hist-track-thresholds=1,8,64; writes_master-hist-track-back=1800; writes_master-hist-track-slice=10; writes_master-hist-track-thresholds=1,8,64; proxy-hist-track-back=1800; proxy-hist-track-slice=10; proxy-hist-track-thresholds=1,8,64; writes_reply-hist-track-back=1800; writes_reply-hist-track-slice=10; writes_reply-hist-track-thresholds=1,8,64; udf-hist-track-back=1800; udf-hist-track-slice=10; udf-hist-track-thresholds=1,8,64; query-hist-track-back=1800; query-hist-track-slice=10; query-hist-track-thresholds=1,8,64; query_rec_count-hist-track-back=1800; query_rec_count-hist-track-slice=10; query_rec_count-hist-track-thresholds=1,8,64
The way it works is when the query results are returned … it is chunked up into query-batch-size. Each batch is then processed in parallel. If it is vanilla secondary index query it is simply IO other wise Lua aggregation is run on it. If your query is selecting row < 100 then you would see it only runnning on single core. Is that the case ??
You may check sindex stats to find out what is avg selectivity of your query if that is now know
aql > stat index <ns>.<set> <indexname>
– R
My aggregations are processing 20 million rows or so per node, but still only execute on a single core.
The aggregation steps I am running are:
filter() : map() : aggregate()
I currently don’t run a reduce() step.
Andrew,
What build are you running with ??
What does client do with result ??. Can you try making client totally light weight, say receive the result and just drop it without doing anything …
Here is my theory, though the server side processing is multi threaded but the network IO is single threaded operation … if the client is not consuming the result fast enough then server shall just show up single core as busy trying to perform IO …
I will try this out … how big is your record size ??
– R
I am using the latest build (3.5.4), I see this result with any of the clients I’ve tested (aql on Ubuntu and the .net client). My records are somewhat weighty, about 35 bins per row with several of those being strings. The client simply shoves the results into a .net List with one row = one class instance and then exits, so no real processing is happening until after the client connection to Aerospike is closed.
I don’t think it’s an IO issue, mostly because after running the query the result sent back to the client is somewhere in the neighborhood of 500k rows (or smaller, sometimes even less than 10k). That number is also the total received from all nodes, so since this is happening post-agg but before any reduction I’m getting a different result set from each node in the cluster. That’s some data, but shouldn’t be a big issue w.r.t. IO. The returned results are also much smaller (maybe 10 bins or so in size) per row. Also, this is all running in-house on the local network.
Edit: Also worth noting, the execution time for the aggregation function in question is roughly the same regardless of how many rows are returned. I’ve done this a number of times while monitoring CPU usage via “top” on the nodes and every time the CPU usage profile is pretty much the same – “top” reports that asd is consuming 100% of the CPU but the idle percentage is 88% with 12% being consumed by all user processes (this is on an 8-core node), which I believe is consistent with a process using only one of the cores.
This does not sound normal… Can you share the client code (only the few lines encapsulating call to aerospike) …
The numbers you have should run the queries and aggregations across multiple cores. Can you also quickly try running just plain vanilla query and see if it is using multiple cores.
– R
If it helps, migrations work great across multiple cores and I can do at least a million rows/sec when I bring a new node onto the cluster (this is using the config params I posted above). Top reports cpu usage in the 200% - 300% range (with my current settings), which is consistent with what I’d expect from how I have things set up.
Again, I see this behavior with AQL running in Ubuntu – the performance is consistent regardless of how many cores and/or how much memory the AQL box has. Nevertheless, here is a snippet of the C# client code:
List<POCO> records = new List<POCO>();
using (AsyncClient client = new AsyncClient(null, _aerospikeHosts.ToArray()))
{
using (ResultSet resultSet = client.QueryAggregate(null, statement, luaPackageName, luaFunction, queryValues))
{
while (resultSet.Next())
{
Dictionary<object, object> val = resultSet.Object as Dictionary<object, object>;
foreach (object key in val.Keys)
{
Dictionary<object, object> value = val[key] as Dictionary<object, object>;
POCO record = new POCO();
record.Key = key.ToString();
record.A = Convert.ToInt32(value["A"]);
record.B = Convert.ToBoolean(value["B"]);
record.C = value["C"].ToString();
...
records.Add(record);
}
}
}
}
The query itself basically does a “between” on a numeric range, so the UDF is scanning all records within that range. The filter function reduces the amount of data fed down the chain, but every time it executes it will scan the same number of rows (hence why performance is the same regardless of how many rows actually return from the UDF aggregation).
Andrew,
Nothing suspicious there … Let me try it out …
– R
Andrew,
Cannot reproduce it ? Did you get chance to run query (without aggregation) does it use multiple core as you observe …
– R
I haven’t had a chance, unfortunately I need to move on to a different project. However, it may be worth noting that the machines where this was observed were virtual, running on Hyper-V and/or VirtualBox. I haven’t tried “bare metal”, so I don’t know if this is an issue with virtualization.