Aggregations (stream UDF) too slow (leaves 23/24 cores idle)


#1

Hi,

we are trying to do map-reduce aggregation using Aerospike, but it is really slow. When we run aggregation in htop there is only 1 core at 100%, but 23 of them remain idle at ~0.1%. Is there any configuration option for query/aggregation to use all cores? (neither increasing query-threads, nor query-worker-threads seems to affect this as well as many others)

We are using aerospike 3.5.15 on 2 nodes of bare metals (32GB RAM, Intel Xeon X5560) running debian wheezy. We have a test example where we put 5 million records, each having 11 integer bins, into aerospike and create index over one of them. All data are stored in RAM with replication factor 2. Finally we run our aggregation on 1 of the 2 nodes using aql tool like this:

aql> AGGREGATE stats.top_products() ON test.test-set WHERE retargetingId = 2

this is to select precisely 1/10 of the dataset = 5e5 records (~2.5e5 per node). But it takes ~2.6s and uses only 1 core!!!

the udf function follows:

local function find_index_of_min(l)
    local min = l[1][1]
    local index = 1
    local i
    for i=1, list.size(l) do
        value = l[i]
        if value[1] < min then
            min = value[1]
            index = i
        end
    end 
    
    return index
end


local function map_stats(rec)
    return list{0, rec["2015_01_1"] + rec["2015_01_2"] + rec["2015_01_3"] + rec["2015_01_4"] + rec["2015_01_5"] + rec["2015_01_6"] + rec["2015_01_7"]}
end


local function agggregate_stats(accumulator, rec)
    local id = rec[1]
    local sum = rec[2]

    local buffer = accumulator[2]
    local min_value = buffer[accumulator[1]]

    local min = min_value[1]
    if sum > min then
        min_value[1] = sum
        min_value[2] = id
        
        accumulator[1] = find_index_of_min(buffer)
    end

    return accumulator
end


local function reduce_stats(accumulator1, accumulator2)
    local accumulator1_data = accumulator1[2]
    local accumulator2_data = accumulator2[2]
    
    local size = list.size(accumulator1_data)
    if size ~= list.size(accumulator2_data) then
        warn("reduce_stats: sizes of accumulators_data do not match!!!")
        warn("reduce_stats: accumulator1_data: %s", tostring(accumulator1_data))
        warn("reduce_stats: accumulator2_data: %s", tostring(accumulator2_data))
        return
    end
    
    local t = {}
    local tl = list.create(size*2)
    local i
    for i = 1, size do
        t[i] = accumulator1_data[i]
        t[i+size] = accumulator2_data[i]
        tl[i] = accumulator1_data[i]
        tl[i+size] = accumulator2_data[i]
    end
    
    table.sort(t, function (lhs, rhs)
        return lhs[1] < rhs[1]
    end)
    
    local accumulator = list{0, list.create(size)}
    local accumulator_data = accumulator[2]
    for i = size*2, size+1, -1 do
        list.append(accumulator_data, t[i])
    end
    
    return accumulator
end


function top_products(stream)
    local count = 3
    local accumulator = list{1, list.create(count)}
    for i = 1, count do
        list.append(accumulator[2], list{0, nil})
    end
    
    return stream : map(map_stats) : aggregate(accumulator, agggregate_stats) : reduce(reduce_stats)
end

$ asinfo -h skhd10.dev -v ‘get-config:’;

transaction-queues=20;transaction-threads-per-queue=100;transaction-duplicate-threads=0;transaction-pending-limit=20;migrate-threads=100;migrate-xmit-priority=40;migrate-xmit-sleep=500;migrate-read-priority=10;migrate-read-sleep=500;migrate-xmit-hwm=10;migrate-xmit-lwm=5;migrate-max-num-incoming=256;migrate-rx-lifetime-ms=60000;proto-fd-max=15000;proto-fd-idle-ms=60000;proto-slow-netio-sleep-ms=1;transaction-retry-ms=1000;transaction-max-ms=1000;transaction-repeatable-read=false;dump-message-above-size=134217728;ticker-interval=10;microbenchmarks=false;storage-benchmarks=false;ldt-benchmarks=false;scan-priority=200;scan-sleep=1;batch-threads=4;batch-max-requests=5000;batch-priority=200;nsup-delete-sleep=100;nsup-period=120;nsup-startup-evict=true;paxos-retransmit-period=5;paxos-single-replica-limit=1;paxos-max-cluster-size=32;paxos-protocol=v3;paxos-recovery-policy=manual;write-duplicate-resolution-disable=false;respond-client-on-master-completion=false;replication-fire-and-forget=false;info-threads=16;allow-inline-transactions=true;use-queue-per-device=false;snub-nodes=false;fb-health-msg-per-burst=0;fb-health-msg-timeout=200;fb-health-good-pct=50;fb-health-bad-pct=0;auto-dun=false;auto-undun=false;prole-extra-ttl=0;max-msgs-per-type=-1;service-threads=20;fabric-workers=36;pidfile=/var/run/aerospike/asd.pid;memory-accounting=false;udf-runtime-gmax-memory=18446744073709551615;udf-runtime-max-memory=18446744073709551615;sindex-populator-scan-priority=3;sindex-data-max-memory=18446744073709551615;query-threads=32;query-worker-threads=480;query-priority=10;query-in-transaction-thread=0;query-req-in-query-thread=0;query-req-max-inflight=100;query-bufpool-size=256;query-batch-size=100;query-sleep=1;query-job-tracking=false;query-short-q-max-size=500;query-long-q-max-size=500;query-rec-count-bound=4294967295;query-threshold=50;query-untracked-time=1000000;service-address=0.0.0.0;service-port=3000;mesh-seed-address-port=10.0.22.139:3002;reuse-address=true;fabric-port=3001;fabric-keepalive-enabled=true;fabric-keepalive-time=1;fabric-keepalive-intvl=1;fabric-keepalive-probes=10;network-info-port=3003;enable-fastpath=true;heartbeat-mode=mesh;heartbeat-protocol=v2;heartbeat-address=10.0.22.139;heartbeat-port=3002;heartbeat-interval=250;heartbeat-timeout=10;enable-security=false;privilege-refresh-period=300;report-authentication-sinks=0;report-data-op-sinks=0;report-sys-admin-sinks=0;report-user-admin-sinks=0;report-violation-sinks=0;syslog-local=-1;enable-xdr=false;forward-xdr-writes=false;xdr-delete-shipping-enabled=true;xdr-nsup-deletes-enabled=false;stop-writes-noxdr=false;reads-hist-track-back=1800;reads-hist-track-slice=10;reads-hist-track-thresholds=1,8,64;writes_master-hist-track-back=1800;writes_master-hist-track-slice=10;writes_master-hist-track-thresholds=1,8,64;proxy-hist-track-back=1800;proxy-hist-track-slice=10;proxy-hist-track-thresholds=1,8,64;writes_reply-hist-track-back=1800;writes_reply-hist-track-slice=10;writes_reply-hist-track-thresholds=1,8,64;udf-hist-track-back=1800;udf-hist-track-slice=10;udf-hist-track-thresholds=1,8,64;query-hist-track-back=1800;query-hist-track-slice=10;query-hist-track-thresholds=1,8,64;query_rec_count-hist-track-back=1800;query_rec_count-hist-track-slice=10;query_rec_count-hist-track-thresholds=1,8,64

#2

Tomas,

I know what may be happening. Let me give you context.

Query engine has two big pieces:

  • Generator (query-threads) - Thread which looks up secondary index and forms list of records to read
  • Worker (query-worker-threads) - Threads which look perform IO.

In anticipation of multiple aggregations running in parallel in case of data-in-memory to avoid context switch, IO is performed in the query thread context. That is why when you run a single aggregation, it is simply running in single thread.

Did you try running multiple such aggregations and seeing if system is able to run on multiple cores?

I see this as a shortcoming of our query scheduler (unfortunately with no workaround for parallelizing single threaded workload). Thanks for reporting. Will work to get this sorted out.

– R


#3

Hi raj,

i tried multiple aggregations in parallel.

In case of index condition:

$ for i in {0..50}; do aql -h skas1.dev -h skas2.dev -T100000 -c "AGGREGATE stats.top_products() ON test.test-set WHERE retargetingId = 2" & done

it used all cores available. (query-threads + query-worker-threads seem to affect this)

so i tried full scan (no condition):

$ for i in {0..50}; do aql -h skas1.dev -h skas2.dev -T100000 -c "AGGREGATE stats.top_products() ON test.test-set" & done

it used at most 3 cores…

I hope you will sort it out soon, so even single aggregation can be as fast as possible.

Tomas


#4

Tomas,

Thanks for the run results !!! Will be fixed

– R