Slow Aggregations via UDF

We’re experiencing very slow aggregations and can’t see any reasoning for it. In order to explain the situation, the following is our setup.

  • everything deployed to AWS
  • Aerospike cluster consists of 2 nodes (i3.xlarge)
  • the instance store has one 950GB NVMe SSD and the EBS storage has 1024GB
  • the used namespace for testing is set up as shadow device configuration
  • the set ssd-store.trackings has 24 million records and has one secondary numeric index specified on a bin called campaign
  • an exemplary campaign has 480k records
  • index creation works as expected as the number of entries for the secondary index equals the total entries
  • the used technology for the client is Node.js (aerospike@3.16.1)
  • the server version is 5.0.0.8

When the aggregation is started from the Node.js process, it takes around 1 minute 15 seconds to finish it. However, when the query is done, it doesn’t show up in completed jobs. See screenshots of AMC for it.

To me, indexes seem to be in a state in which they’re usable for queries (state: RW). However, in AMC under Definitions, they’re listed as Synced on all nodes?: NO.

[ec2-user@seed1 ~]$ aql
Seed:         127.0.0.1
User:         None
Config File:  /etc/aerospike/astools.conf /home/ec2-user/.aerospike/astools.conf 
Aerospike Query Client
Version 3.28.0
C Client Version 4.6.15
Copyright 2012-2020 Aerospike. All rights reserved.
aql> show indexes
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| ns          | bin        | indextype | set         | state | indexname  | path       | type      |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| "ssd-store" | "campaign" | "NONE"    | "trackings" | "RW"  | "campaign" | "campaign" | "NUMERIC" |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
[127.0.0.1:3000] 1 row in set (0.001 secs)

+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| ns          | bin        | indextype | set         | state | indexname  | path       | type      |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
| "ssd-store" | "campaign" | "NONE"    | "trackings" | "RW"  | "campaign" | "campaign" | "NUMERIC" |
+-------------+------------+-----------+-------------+-------+------------+------------+-----------+
[<REDACTED IP SEED 2>:3000] 1 row in set (0.002 secs)

OK

aql> 

/etc/aerospike/aerospike.conf

service {
	user root
	group root
	paxos-single-replica-limit 1
	pidfile /var/run/aerospike/asd.pid
	proto-fd-max 15000
}

logging {
	file /var/log/aerospike/aerospike.log {
		context any info
	}
}

network {
	service {
		address	any
		port 3000
	}

	heartbeat {
		mode mesh
		port 3002
		mesh-seed-address-port <REDACTED IP SEED 1> 3002
		mesh-seed-address-port <REDACTED IP SEED 2> 3002
		interval 150
		timeout 10
	}

	fabric {
		port 3001
	}
}

namespace ssd-store {
	replication-factor 2
	memory-size 20G
	default-ttl 366d
	allow-ttl-without-nsup true

	storage-engine device {
		device /dev/xvdb /dev/sdb
		write-block-size 128K
	}
}

mod-lua {
	cache-enabled true
}

the Node.js query for the aggregation

const query = client.query(namespace, set)
const mandatoryFields = [BINS.STAGE]

query.select([
	BINS.CREATED_AT,
	BINS.EVENT,
	BINS.PAYOUT,
	BINS.STAGE,
	BINS.TRIGGER
])

query.where(Aerospike.filter.equal('campaign', campaign))

const events = {
	withPayout: [EVENTS.CLICK, EVENTS.HOVER],
	withoutPayout: [EVENTS.VIEW]
}

const result = await query.apply(
	'top-stage',
	'run',
	[
		dateFrom,
		dateTo,
		mandatoryFields,
		events.withPayout,
		events.withoutPayout,
		'trigger'
	],
	{ totalTimeout: 0 }
)

The UDF in LUA for the aggregation

local filters = require("filters")

-- stream: contains all needed input data
-- date_from: start date for records
-- date_to: end date for records
-- mandatory_fields: required bins (fields) to be present
-- payout_events: for these events payout must be true
-- events_without_payout: for these events payout must be false
-- exclude_by_property: exclude if property exists
function run(stream, date_from, date_to, mandatory_fields, payout_events, events_without_payout, exclude_by_property)
	local function filter_local(rec)
		if filters.filter_mandatory_fields(rec, mandatory_fields) == false then
			return false
		end

		if filters.filter_date_range(rec, date_from, date_to) == false then
			return false
		end

		for payout_event in list.iterator(payout_events) do
			if rec.event and rec.event == payout_event then
				if filters.filter_payout(rec, 0) == true then
					return false
				end
			end
		end

		for event in list.iterator(events_without_payout) do
			if rec.event and rec.event == event then
				if filters.filter_payout(rec, 1) == true then
					return false
				end
			end
		end

		if rec[exclude_by_property] then
			return false
		end

		return true
	end

	local function mapper(out, rec)
		local stage = rec.stage
		local event = rec.event

		out[stage] = (out[stage] or map())
		out[stage][event] = (out[stage][event] or 0) + 1

		return out
	end

	local function reducer(a, b)
		return map.merge(
			a or map(),
			b or map(),
			function(stage_a, stage_b)
				return map.merge(
					stage_a,
					stage_b,
					function(event_a, event_b)
						return (event_a or 0) + (event_b or 0)
					end
				)
			end
		)
	end

	return stream:filter(filter_local):aggregate(map({}), mapper):reduce(reducer)
end

So the big question is now, what are we doing wrong here? Because an aggregation over 24 million records filtered down to 480k records that takes 75 seconds is amazingly slow (6400 records per second).

When the aggregation is executed, both cluster nodes have a CPU load of around 10% on one single CPU. RAM usage seems to be unaffected. The actual cluster nodes are an AWS i3.xlarge, which means they have 4 vCPU, 16 ECU and 30.5 GB RAM.

How can we make use of all the available CPUs for such queries?

Did you try tuning query-threads? There are a few other query related config params. Not sure whether they would help this particular situation, though.

The query-threads default value is 6 and the maximum is 32. As far as I remember, the number of threads should be the number of CPUs in the node. As the i3.xlarge has 4 CPUs, the default of 6 should be already fine.

For testing, I did set the query-priority=40, query-threads=32, query-worker-threads=480, query-threshold=300. However, even with those maximum settings for the query configuration, I don’t see any adjustment.

OK, I’m just so amazingly stupid. I had the slow EBS device as the storage device and the NVMe as the shadow device :see_no_evil: So the issue is in the aerospike.conf. In order to fix it, the line device /dev/xvdb /dev/sdb needs to get changed to device /dev/nvme0n1 /dev/xvdb.

The aggregation now finishes in 6.5 seconds, which is around 74k records per second.

Good catch!

This topic was automatically closed 6 days after the last reply. New replies are no longer allowed.