I have implemented a map reduce functionality in Java. First I am inserting some records in the Aerospike then I am counting the no. of records through Map/Reduce.
The code for using Map/Reduce is.
Statement stmt = new Statement();
stmt.setNamespace(this.namespace);
stmt.setSetName(this.set);
stmt.setFilters(Filter.range(this.binName2, dataRangeLowerLimit, dataRangeUpperLimit));
stmt.setBinNames(this.binName2);
stmt.setIndexName(this.indexName);
ResultSet rs = this.client.queryAggregate(null, stmt, “CountFile”, “count_function”); // call to lua file
if (rs.next()) {
Object result = rs.getObject();
sum = ((Long)result).doubleValue();
System.out.println("Count = " + result);
}
I am calling above piece of code from 100 different threads. I have tried for 100,000, 200,000 - 600,000 records but its not working for 1M records.
dataRangeLowerLimit and dataRangeUpperLimit —> These gets value from threads. No of records divided by noOfThreads which is 100 in my case.
So for 100,000 records each thread will gets to execute 1000 records and Filter.range will be 0,999 for 1st thread
and 1000,1999 for 2nd thread and so on.
But as soon as I gave 1M records or even 700,000 records, queryAggregate returns correct results for few threads then starts giving random numbers and mostly 100 instead.
Lua file is
local function one(rec)
return 1
end
local function add(a,b)
return a+b
end
function count_function(stream)
return stream : map(one) : reduce(add);
end
Can anybody please help me in finding out the problem here ?