Map Reduce for records using udf against String key and double value


#1

Hi All,

I have generated the data in the form of Long Keys and Double values and inserting it in Aerospike as

Aerospike.client.put(this.writePolicy, new Key(“test”, “profile”, keyGet), new Bin(this.binName, value)); keyGet is String Key value is Double.

I am also getting back these values as Key key = new Key(this.namespace, this.set, keyStr); //keyStr is String Key this.client.get(this.readPolicy, key, this.binName);

so above put and get is working fine. But I am not able to implement a Map Reduce for these records. What I want is - I want to add all the double values stored against all the keys and return the total sum from the UDF using Map and Reduce functions.

Thanks Nitin


#2

Hi Nitin,

Will a java example help?

Peter


#3

Hi Peter,

Yes that will surely help. I am trying to achieve this in Java only.


#4

I have implemented a map reduce functionality in Java. First I am inserting some records in the Aerospike then I am counting the no. of records through Map/Reduce.

The code for using Map/Reduce is.

Statement stmt = new Statement(); stmt.setNamespace(this.namespace); stmt.setSetName(this.set); stmt.setFilters(Filter.range(this.binName2, dataRangeLowerLimit, dataRangeUpperLimit)); stmt.setBinNames(this.binName2); stmt.setIndexName(this.indexName);

ResultSet rs = this.client.queryAggregate(null, stmt, “CountFile”, “count_function”); // call to lua file

if (rs.next()) { Object result = rs.getObject(); sum = ((Long)result).doubleValue(); System.out.println("Count = " + result); }

I am calling above piece of code from 100 different threads. I have tried for 100,000, 200,000 - 600,000 records but its not working for 1M records. dataRangeLowerLimit and dataRangeUpperLimit —> These gets value from threads. No of records divided by noOfThreads which is 100 in my case.

So for 100,000 records each thread will gets to execute 1000 records and Filter.range will be 0,999 for 1st thread and 1000,1999 for 2nd thread and so on.

But as soon as I gave 1M records or even 700,000 records, queryAggregate returns correct results for few threads then starts giving random numbers and mostly 100 instead.

Lua file is

local function one(rec) return 1 end

local function add(a,b) return a+b end

function count_function(stream)

return stream : map(one) : reduce(add);

end

Can anybody please help me in finding out the problem here ?


#5

Hi Nitin

I’m working on a comprehensive example for you, so I’ll add this to it.

Regards

Peter


Not able to get the required Throughput time
#6

Hi Nitin

I have created an example at https://github.com/aerospike/common-aggregation-functions.git

This example uses Aerospike aggregations to count, sum and return values that can be used to find the average and the standard deviation.

The code includes:

  • Index creation
  • UDF registration
  • Writing 2 million records
  • Fixed point arithmetic to store Double values in Aerospike as a 8 byte unsigned integer
  • Sum aggregation
  • Count aggregation
  • Stats aggregation (for average and standard deviation)

I ran this on Aerospike 3.3.19 on a single node cluster using a SSD name space.

I hope this helps


#7

Thanks Peter.

I will run the example today and will get back.

Nitin


#8

Hi Peter

Your example is running fine. I have a query. I am trying to run the example with multiple threads.

Each thread will be executing the statement query for specific range. Like for 1st thread range will be 0-100,000 2nd thread range will be 100,001-200,000 and so…on

Is this a problem with map reduce or Lua file or with secondary index query ?

Because if I run the example with one thread, then there is no problem but its not giving correct results with multiple threads.


#9

I modified your example as :

ExecutorService executorService = Executors.newFixedThreadPool(10); for (int index = 0; index <10; index++) {

			executorService.execute(new Runnable() {

				@Override
				public void run() {
					
					try {
						as.work();
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
		}

// as.work();