We encountered a problem when running map reduce on a large dataset with python client. The intention is to query on a secondary index then filter the records with a UDF to get an aggregate count on different status. When running on a small set with less than 2 million records, everything works. When the number of records returned by the secondary index exceeds 2 million, we encounter Error Code 215 (AS_PROTO_RESULT_FAIL_QUERY_DUPLICATE). We are unsure what this means in the context of the client.
We provide the codes we used to run the aggregation.
Python
def aggregate_rec(client, namespace, setname, as_ref_key):
"""Aggregates statistics for a given gallery
:param client: aerospike client
:param namespace: name of namespace holding records
:param setname: gallery of records
:param as_ref_key: gallery name
:return:
"""
if not client.is_connected():
client.connect()
# operation is idempotent, safe to retry
query_policy = {
'max_retries': 3,
'sleep_between_retries': 2000,
'total_timeout': 60000 # 60 seconds for query, default is 1s
}
# operation is idempotent, safe to retry
apply_policy = {
'max_retries': 3,
'sleep_between_retries': 2000,
'total_timeout': 60000 # 60 seconds for query, default is 1s
}
query = client.query(namespace, setname)
query.select('a', 'b', 'rec_status', policy=query_policy)
query.where(aerospike.predicates.equals('as_ref_key', as_ref_key))
query.apply('v2_galleryQuery', 'aggregate_records', policy=apply_policy)
subtotals = []
def collect_subtotals(subtotal):
subtotals.append(subtotal)
def _aggregate_sumtotals(subtotals):
totals = {'s1': 0, 's2': 0, 's3': 0, 'total': 0}
if subtotals:
for subtotal in subtotals:
for k in subtotal.keys():
totals[k] += subtotal[k]
totals['total'] += subtotal[k]
return totals
query.foreach(collect_subtotals, policy=query_policy)
return _aggregate_sumtotals(subtotals)
Lua script
function aggregate_records(stream)
local function count(totalCount, rec)
totalCount[rec['rec_status']] = totalCount[rec['rec_status']] + 1
return totalCount
end
local agg_count = map();
agg_count['s1'] = 0;
agg_count['s2'] = 0;
agg_count['s3'] = 0;
return stream : aggregate(agg_count, count)
end
Client Version
Python Aerospike version 3.1.1
Server Version
7 : edition
Aerospike Enterprise Edition
8 : version
Aerospike Enterprise Edition build 4.2.0.5
9 : build
4.2.0.5
10 : services
12 : build_os
ubuntu16.04