How to do Sum distinct


I am trying to use Aerospike to do some capping (limit some operations) each application server sends it’s counter to Aerospike so i have records like this:

id= 1 | counter = 10 id= 1 | counter = 13 id= 2 | counter = 10

i would like to sum all the records and get this result: id=1 | sum = 23 id=2 | sum = 10

is there any way to do this on the Aerospike cluster?


Yes, quite straightforward. You can run a stream udf for aggregation. The website has a java client example, look into the aggregation section of the example.

You will need a stream udf similar to aggregationByRegion.lua and invoke it something like this:

public void aggregateUsersByTweetCountByRegion() throws AerospikeException, InterruptedException {

  // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. 
  // The recommended way of creating indexes in production env is via AQL
  // or create once using standalone application code.
  IndexTask task = client.createIndex(null, "test", "users",
  		"tweetcount_index", "tweetcount", IndexType.NUMERIC);
  task.waitTillComplete(100);  //This will not be needed if index is created apriori
  ResultSet rs = null;
  try {
  	int min;
  	int max;
  	console.printf("\nEnter Min Tweet Count:");
  	min = Integer.parseInt(console.readLine());
  	console.printf("Enter Max Tweet Count:");
  	max = Integer.parseInt(console.readLine());
        // Register UDF - perhaps write a separate application just to register UDF 
        // with the cluster once and if you update it.
        LuaConfig.SourceDirectory = "udf";
  	File udfFile = new File("udf/aggregationByRegion.lua");
  	RegisterTask rt = client.register(null, udfFile.getPath(),
  			udfFile.getName(), Language.LUA);
  	rt.waitTillComplete(100);  //Not needed if pre-registered 
  	// Create String array of bins you would like to retrieve. 
  	// In this example, we want to output which region has how many tweets. 
  	String[] bins = { "tweetcount", "region" };
  	// Create Statement instance			
  	Statement stmt = new Statement();
  	// Set namespace on the instance of Statement		    		
  	// Set name of the set on the instance of Statement		   		
  	// Set name of the index on the instance of Statement (optional)		  		
  	// Set list of bins you want retrieved on the instance of Statement		    		
  	// Set min--max range Filter on tweetcount on the instance of Statement		   
  	stmt.setFilters(Filter.range("tweetcount", min, max));
  	// Execute Aggregation query passing in <null> policy and instance of Statement,
  	// Lua module and module function to call.
  	rs = client.queryAggregate(null, stmt, "aggregationByRegion", "sum");
  	console.printf("\nAggregating users with " + min + "-"
  			+ max + " tweets by region. Hang on...\n");
  	// Iterate through returned RecordSet and for each record, 
  	// and do what you need. Printing here...
  	if ( {
  		Map<Object, Object> result = (Map<Object, Object>) rs
  		console.printf("\nTotal Users in North: " + result.get("n") + "\n");
  		console.printf("Total Users in South: " + result.get("s") + "\n");
  		console.printf("Total Users in East: " + result.get("e") + "\n");
  		console.printf("Total Users in West: " + result.get("w") + "\n");
  } finally {
  	//Close record set 			
  	if (rs != null) {				

} //aggregateUsersByTweetCountByRegion

Here is what the lua file in the above example looks like. You can use similar construct to get the results you want.

local function aggregate_stats(map,rec) – Examine value of ‘region’ bin in record rec and increment respective counter in the map if rec.region == ‘n’ then map[‘n’] = map[‘n’] + 1 elseif rec.region == ‘s’ then map[‘s’] = map[‘s’] + 1 elseif rec.region == ‘e’ then map[‘e’] = map[‘e’] + 1 elseif rec.region == ‘w’ then map[‘w’] = map[‘w’] + 1 end – return updated map return map end

local function reduce_stats(a,b) – Merge values from map b into a a.n = a.n + b.n a.s = a.s + b.s a.e = a.e + b.e a.w = a.w + b.w – Return updated map a return a end

function sum(stream) – Process incoming record stream and pass it to aggregate function, then to reduce function – NOTE: aggregate function aggregate_stats accepts two parameters: – 1) A map that contains four variables to store number-of-users counter for north, south, east and west regions with initial value set to 0
– 2) function name aggregate_stats – which will be called for each record as it flows in – Return reduced value of the map generated by reduce function reduce_stats return stream : aggregate(map{n=0,s=0,e=0,w=0},aggregate_stats) : reduce(reduce_stats) end


Thanks for the quick response, I thought there would be a simpler way to do this :frowning: