Hi,
we are trying to do map-reduce aggregation using Aerospike, but it is really slow. When we run aggregation in htop there is only 1 core at 100%, but 23 of them remain idle at ~0.1%. Is there any configuration option for query/aggregation to use all cores? (neither increasing query-threads, nor query-worker-threads seems to affect this as well as many others)
We are using aerospike 3.5.15 on 2 nodes of bare metals (32GB RAM, Intel Xeon X5560) running debian wheezy. We have a test example where we put 5 million records, each having 11 integer bins, into aerospike and create index over one of them. All data are stored in RAM with replication factor 2. Finally we run our aggregation on 1 of the 2 nodes using aql tool like this:
aql> AGGREGATE stats.top_products() ON test.test-set WHERE retargetingId = 2
this is to select precisely 1/10 of the dataset = 5e5 records (~2.5e5 per node). But it takes ~2.6s and uses only 1 core!!!
the udf function follows:
local function find_index_of_min(l)
local min = l[1][1]
local index = 1
local i
for i=1, list.size(l) do
value = l[i]
if value[1] < min then
min = value[1]
index = i
end
end
return index
end
local function map_stats(rec)
return list{0, rec["2015_01_1"] + rec["2015_01_2"] + rec["2015_01_3"] + rec["2015_01_4"] + rec["2015_01_5"] + rec["2015_01_6"] + rec["2015_01_7"]}
end
local function agggregate_stats(accumulator, rec)
local id = rec[1]
local sum = rec[2]
local buffer = accumulator[2]
local min_value = buffer[accumulator[1]]
local min = min_value[1]
if sum > min then
min_value[1] = sum
min_value[2] = id
accumulator[1] = find_index_of_min(buffer)
end
return accumulator
end
local function reduce_stats(accumulator1, accumulator2)
local accumulator1_data = accumulator1[2]
local accumulator2_data = accumulator2[2]
local size = list.size(accumulator1_data)
if size ~= list.size(accumulator2_data) then
warn("reduce_stats: sizes of accumulators_data do not match!!!")
warn("reduce_stats: accumulator1_data: %s", tostring(accumulator1_data))
warn("reduce_stats: accumulator2_data: %s", tostring(accumulator2_data))
return
end
local t = {}
local tl = list.create(size*2)
local i
for i = 1, size do
t[i] = accumulator1_data[i]
t[i+size] = accumulator2_data[i]
tl[i] = accumulator1_data[i]
tl[i+size] = accumulator2_data[i]
end
table.sort(t, function (lhs, rhs)
return lhs[1] < rhs[1]
end)
local accumulator = list{0, list.create(size)}
local accumulator_data = accumulator[2]
for i = size*2, size+1, -1 do
list.append(accumulator_data, t[i])
end
return accumulator
end
function top_products(stream)
local count = 3
local accumulator = list{1, list.create(count)}
for i = 1, count do
list.append(accumulator[2], list{0, nil})
end
return stream : map(map_stats) : aggregate(accumulator, agggregate_stats) : reduce(reduce_stats)
end
$ asinfo -h skhd10.dev -v ‘get-config:’;
transaction-queues=20;transaction-threads-per-queue=100;transaction-duplicate-threads=0;transaction-pending-limit=20;migrate-threads=100;migrate-xmit-priority=40;migrate-xmit-sleep=500;migrate-read-priority=10;migrate-read-sleep=500;migrate-xmit-hwm=10;migrate-xmit-lwm=5;migrate-max-num-incoming=256;migrate-rx-lifetime-ms=60000;proto-fd-max=15000;proto-fd-idle-ms=60000;proto-slow-netio-sleep-ms=1;transaction-retry-ms=1000;transaction-max-ms=1000;transaction-repeatable-read=false;dump-message-above-size=134217728;ticker-interval=10;microbenchmarks=false;storage-benchmarks=false;ldt-benchmarks=false;scan-priority=200;scan-sleep=1;batch-threads=4;batch-max-requests=5000;batch-priority=200;nsup-delete-sleep=100;nsup-period=120;nsup-startup-evict=true;paxos-retransmit-period=5;paxos-single-replica-limit=1;paxos-max-cluster-size=32;paxos-protocol=v3;paxos-recovery-policy=manual;write-duplicate-resolution-disable=false;respond-client-on-master-completion=false;replication-fire-and-forget=false;info-threads=16;allow-inline-transactions=true;use-queue-per-device=false;snub-nodes=false;fb-health-msg-per-burst=0;fb-health-msg-timeout=200;fb-health-good-pct=50;fb-health-bad-pct=0;auto-dun=false;auto-undun=false;prole-extra-ttl=0;max-msgs-per-type=-1;service-threads=20;fabric-workers=36;pidfile=/var/run/aerospike/asd.pid;memory-accounting=false;udf-runtime-gmax-memory=18446744073709551615;udf-runtime-max-memory=18446744073709551615;sindex-populator-scan-priority=3;sindex-data-max-memory=18446744073709551615;query-threads=32;query-worker-threads=480;query-priority=10;query-in-transaction-thread=0;query-req-in-query-thread=0;query-req-max-inflight=100;query-bufpool-size=256;query-batch-size=100;query-sleep=1;query-job-tracking=false;query-short-q-max-size=500;query-long-q-max-size=500;query-rec-count-bound=4294967295;query-threshold=50;query-untracked-time=1000000;service-address=0.0.0.0;service-port=3000;mesh-seed-address-port=10.0.22.139:3002;reuse-address=true;fabric-port=3001;fabric-keepalive-enabled=true;fabric-keepalive-time=1;fabric-keepalive-intvl=1;fabric-keepalive-probes=10;network-info-port=3003;enable-fastpath=true;heartbeat-mode=mesh;heartbeat-protocol=v2;heartbeat-address=10.0.22.139;heartbeat-port=3002;heartbeat-interval=250;heartbeat-timeout=10;enable-security=false;privilege-refresh-period=300;report-authentication-sinks=0;report-data-op-sinks=0;report-sys-admin-sinks=0;report-user-admin-sinks=0;report-violation-sinks=0;syslog-local=-1;enable-xdr=false;forward-xdr-writes=false;xdr-delete-shipping-enabled=true;xdr-nsup-deletes-enabled=false;stop-writes-noxdr=false;reads-hist-track-back=1800;reads-hist-track-slice=10;reads-hist-track-thresholds=1,8,64;writes_master-hist-track-back=1800;writes_master-hist-track-slice=10;writes_master-hist-track-thresholds=1,8,64;proxy-hist-track-back=1800;proxy-hist-track-slice=10;proxy-hist-track-thresholds=1,8,64;writes_reply-hist-track-back=1800;writes_reply-hist-track-slice=10;writes_reply-hist-track-thresholds=1,8,64;udf-hist-track-back=1800;udf-hist-track-slice=10;udf-hist-track-thresholds=1,8,64;query-hist-track-back=1800;query-hist-track-slice=10;query-hist-track-thresholds=1,8,64;query_rec_count-hist-track-back=1800;query_rec_count-hist-track-slice=10;query_rec_count-hist-track-thresholds=1,8,64