We’re experiencing very slow aggregations and can’t see any reasoning for it. In order to explain the situation, the following is our setup.
- everything deployed to AWS
- Aerospike cluster consists of 2 nodes (i3.xlarge)
- the instance store has one 950GB NVMe SSD and the EBS storage has 1024GB
- the used namespace for testing is set up as shadow device configuration
- the set
ssd-store.trackings
has 24 million records and has one secondary numeric index specified on a bin calledcampaign
- an exemplary campaign has 480k records
- index creation works as expected as the number of entries for the secondary index equals the total entries
- the used technology for the client is Node.js (aerospike@3.16.1)
- the server version is 5.0.0.8
When the aggregation is started from the Node.js process, it takes around 1 minute 15 seconds to finish it. However, when the query is done, it doesn’t show up in completed jobs. See screenshots of AMC for it.
To me, indexes seem to be in a state in which they’re usable for queries (state: RW
). However, in AMC under Definitions, they’re listed as Synced on all nodes?: NO
.
[ec2-user@seed1 ~]$ aql
Seed: 127.0.0.1
User: None
Config File: /etc/aerospike/astools.conf /home/ec2-user/.aerospike/astools.conf
Aerospike Query Client
Version 3.28.0
C Client Version 4.6.15
Copyright 2012-2020 Aerospike. All rights reserved.
aql> show indexes
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| ns | bin | indextype | set | state | indexname | path | type |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| "ssd-store" | "campaign" | "NONE" | "trackings" | "RW" | "campaign" | "campaign" | "NUMERIC" |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
[127.0.0.1:3000] 1 row in set (0.001 secs)
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| ns | bin | indextype | set | state | indexname | path | type |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| "ssd-store" | "campaign" | "NONE" | "trackings" | "RW" | "campaign" | "campaign" | "NUMERIC" |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
[<REDACTED IP SEED 2>:3000] 1 row in set (0.002 secs)
OK
aql>
/etc/aerospike/aerospike.conf
service {
user root
group root
paxos-single-replica-limit 1
pidfile /var/run/aerospike/asd.pid
proto-fd-max 15000
}
logging {
file /var/log/aerospike/aerospike.log {
context any info
}
}
network {
service {
address any
port 3000
}
heartbeat {
mode mesh
port 3002
mesh-seed-address-port <REDACTED IP SEED 1> 3002
mesh-seed-address-port <REDACTED IP SEED 2> 3002
interval 150
timeout 10
}
fabric {
port 3001
}
}
namespace ssd-store {
replication-factor 2
memory-size 20G
default-ttl 366d
allow-ttl-without-nsup true
storage-engine device {
device /dev/xvdb /dev/sdb
write-block-size 128K
}
}
mod-lua {
cache-enabled true
}
the Node.js query for the aggregation
const query = client.query(namespace, set)
const mandatoryFields = [BINS.STAGE]
query.select([
BINS.CREATED_AT,
BINS.EVENT,
BINS.PAYOUT,
BINS.STAGE,
BINS.TRIGGER
])
query.where(Aerospike.filter.equal('campaign', campaign))
const events = {
withPayout: [EVENTS.CLICK, EVENTS.HOVER],
withoutPayout: [EVENTS.VIEW]
}
const result = await query.apply(
'top-stage',
'run',
[
dateFrom,
dateTo,
mandatoryFields,
events.withPayout,
events.withoutPayout,
'trigger'
],
{ totalTimeout: 0 }
)
The UDF in LUA for the aggregation
local filters = require("filters")
-- stream: contains all needed input data
-- date_from: start date for records
-- date_to: end date for records
-- mandatory_fields: required bins (fields) to be present
-- payout_events: for these events payout must be true
-- events_without_payout: for these events payout must be false
-- exclude_by_property: exclude if property exists
function run(stream, date_from, date_to, mandatory_fields, payout_events, events_without_payout, exclude_by_property)
local function filter_local(rec)
if filters.filter_mandatory_fields(rec, mandatory_fields) == false then
return false
end
if filters.filter_date_range(rec, date_from, date_to) == false then
return false
end
for payout_event in list.iterator(payout_events) do
if rec.event and rec.event == payout_event then
if filters.filter_payout(rec, 0) == true then
return false
end
end
end
for event in list.iterator(events_without_payout) do
if rec.event and rec.event == event then
if filters.filter_payout(rec, 1) == true then
return false
end
end
end
if rec[exclude_by_property] then
return false
end
return true
end
local function mapper(out, rec)
local stage = rec.stage
local event = rec.event
out[stage] = (out[stage] or map())
out[stage][event] = (out[stage][event] or 0) + 1
return out
end
local function reducer(a, b)
return map.merge(
a or map(),
b or map(),
function(stage_a, stage_b)
return map.merge(
stage_a,
stage_b,
function(event_a, event_b)
return (event_a or 0) + (event_b or 0)
end
)
end
)
end
return stream:filter(filter_local):aggregate(map({}), mapper):reduce(reducer)
end
So the big question is now, what are we doing wrong here? Because an aggregation over 24 million records filtered down to 480k records that takes 75 seconds is amazingly slow (6400 records per second).