Yes, quite straightforward. You can run a stream udf for aggregation. The website has a java client example, look into the aggregation section of the example. http://www.aerospike.com/docs/client/java/examples/application
You will need a stream udf similar to aggregationByRegion.lua and invoke it something like this:
public void aggregateUsersByTweetCountByRegion() throws AerospikeException,
InterruptedException {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
// The recommended way of creating indexes in production env is via AQL
// or create once using standalone application code.
IndexTask task = client.createIndex(null, "test", "users",
"tweetcount_index", "tweetcount", IndexType.NUMERIC);
task.waitTillComplete(100); //This will not be needed if index is created apriori
ResultSet rs = null;
try {
int min;
int max;
console.printf("\nEnter Min Tweet Count:");
min = Integer.parseInt(console.readLine());
console.printf("Enter Max Tweet Count:");
max = Integer.parseInt(console.readLine());
// Register UDF - perhaps write a separate application just to register UDF
// with the cluster once and if you update it.
LuaConfig.SourceDirectory = "udf";
File udfFile = new File("udf/aggregationByRegion.lua");
RegisterTask rt = client.register(null, udfFile.getPath(),
udfFile.getName(), Language.LUA);
rt.waitTillComplete(100); //Not needed if pre-registered
// Create String array of bins you would like to retrieve.
// In this example, we want to output which region has how many tweets.
String[] bins = { "tweetcount", "region" };
// Create Statement instance
Statement stmt = new Statement();
// Set namespace on the instance of Statement
stmt.setNamespace("test");
// Set name of the set on the instance of Statement
stmt.setSetName("users");
// Set name of the index on the instance of Statement (optional)
stmt.setIndexName("tweetcount_index");
// Set list of bins you want retrieved on the instance of Statement
stmt.setBinNames(bins);
// Set min--max range Filter on tweetcount on the instance of Statement
stmt.setFilters(Filter.range("tweetcount", min, max));
// Execute Aggregation query passing in <null> policy and instance of Statement,
// Lua module and module function to call.
rs = client.queryAggregate(null, stmt, "aggregationByRegion", "sum");
console.printf("\nAggregating users with " + min + "-"
+ max + " tweets by region. Hang on...\n");
// Iterate through returned RecordSet and for each record,
// and do what you need. Printing here...
if (rs.next()) {
@SuppressWarnings("unchecked")
Map<Object, Object> result = (Map<Object, Object>) rs
.getObject();
console.printf("\nTotal Users in North: " + result.get("n") + "\n");
console.printf("Total Users in South: " + result.get("s") + "\n");
console.printf("Total Users in East: " + result.get("e") + "\n");
console.printf("Total Users in West: " + result.get("w") + "\n");
}
} finally {
//Close record set
if (rs != null) {
rs.close();
}
}
} //aggregateUsersByTweetCountByRegion
Here is what the lua file in the above example looks like. You can use similar construct to get the results you want.
local function aggregate_stats(map,rec)
– Examine value of ‘region’ bin in record rec and increment respective counter in the map
if rec.region == ‘n’ then
map[‘n’] = map[‘n’] + 1
elseif rec.region == ‘s’ then
map[‘s’] = map[‘s’] + 1
elseif rec.region == ‘e’ then
map[‘e’] = map[‘e’] + 1
elseif rec.region == ‘w’ then
map[‘w’] = map[‘w’] + 1
end
– return updated map
return map
end
local function reduce_stats(a,b)
– Merge values from map b into a
a.n = a.n + b.n
a.s = a.s + b.s
a.e = a.e + b.e
a.w = a.w + b.w
– Return updated map a
return a
end
function sum(stream)
– Process incoming record stream and pass it to aggregate function, then to reduce function
– NOTE: aggregate function aggregate_stats accepts two parameters:
– 1) A map that contains four variables to store number-of-users counter for north, south, east and west regions with initial value set to 0
– 2) function name aggregate_stats – which will be called for each record as it flows in
– Return reduced value of the map generated by reduce function reduce_stats
return stream : aggregate(map{n=0,s=0,e=0,w=0},aggregate_stats) : reduce(reduce_stats)
end