Counting distinct (unique)

aggregation
index
secondary
udf
query

#1

Post by javadevmtl » Thu May 08, 2014 7:32 am

Say I have the following schema

namespace: SomeSpace Set: SomeSet Bins: field1, field2, field3, field4

In the SQl world I would want to run the following queries.

SELECT COUNT(DISTINCT field1), count(DISTINCT field2), count(DISTINCT field3) WHERE field4 = ? SELECT COUNT(DISTINCT field2), count(DISTINCT field3), count(DISTINCT field4) WHERE field1 = ? SELECT COUNT(DISTINCT field3), count(DISTINCT field4), count(DISTINCT field1) WHERE field2 = ?

If you notice it’s the same query but the WHERE clause changes…

Is it possible with Streaming UDF and how would you do it with streaming UDF? It’s kinda much to have to learn LUA for evaluation purposes and figure out if even doing right… Would someone have a sample LUA streaming UDF that show how to do the above?

Thanks


#2

Post by javadevmtl » Wed May 14, 2014 7:03 am

Any thoughts or ideas?


#3

Post by young » Wed May 14, 2014 9:22 am

Aerospike now has a secondary index feature that can be applied to the field. In addition, you can filter responses at the server to do restrict it even more. This would be done using a UDF. For example if you set a secondary index on time, you could restrict the query to only those entered in the last 10 minutes. This is done in RAM and very fast. The UDF could apply additional filtering on the result.

However, at present doing a distinct count can be tricky in a distributed environment, regardless as the the field. The problem is that you cannot do a count on each node and simply sum up the count for each node. You would need to either send the objects back to the client and do a distinct count there or handle it a different way.


#4

Post by devops01 » Wed May 14, 2014 9:59 am

Aggregation UDFs can accept arguments. See the examples at the end of our dev guide. https://docs.aerospike.com/display/V3/Aggregation+Guide The args can be passed from our java/c client when invoking the UDF. https://docs.aerospike.com/display/V3/J … d+Function

One can write an Aggregation UDF where the UDF will accept 4 arguments. When defining the stream, and can dynamically define the filter/map/reduce functions using the arguments (the example shows only for filter function).

One of our engineer provided the following example:

The instructions to run are as follows

#create secondary index on bin0 $ aql -c “create index myind1 on test.demo (bin0) numeric”

#Inserting sample data $ for i in {1…1000}; do aql -c “insert into test.demo(PK, bin0, bin1, bin2, bin3, bin4) values (‘key$i’, $i,expr $i / 2,expr $i / 3,expr $i / 4,expr $i / 5)”; done

#The data looks something like this $ aql -c 'select * from test.demo where bin0 between 1 and 100’ ±-----±-----±-----±-----±-----+ | bin0 | bin1 | bin2 | bin3 | bin4 | ±-----±-----±-----±-----±-----+ | 1 | 0 | 0 | 0 | 0 | | 2 | 1 | 0 | 0 | 0 | | 3 | 1 | 1 | 0 | 0 | | 4 | 2 | 1 | 1 | 0 | | 5 | 2 | 1 | 1 | 1 | | 6 | 3 | 2 | 1 | 1 | | 7 | 3 | 2 | 1 | 1 | | 8 | 4 | 2 | 2 | 1 | | 9 | 4 | 3 | 2 | 1 | | 10 | 5 | 3 | 2 | 2 |

#running the aggregation from sql #trying the same aggregation udf with different params $ aql -o json Aerospike Query Copyright 2013 Aerospike. All rights reserved.

Connected to 127.0.0.1:3000

aql> set LUA_USERPATH '/home/sunil’ aql> register module '/home/sunil/distinct_with_args.lua’ OK, 1 module added. aql> aggregate distinct_with_args.distinct(‘bin1’, ‘bin2’, ‘bin3’, ‘bin4’, 0) on test.demo where bin0 between 0 and 1000 [ { “distinct”: Map(“count1”->501, “count2”->334, “count3”->251), } ]

aql> aggregate distinct_with_args.distinct(‘bin1’, ‘bin2’, ‘bin3’, ‘bin4’, 199) on test.demo where bin0 between 0 and 1000 [ { “distinct”: Map(“count1”->4, “count2”->3, “count3”->3), } ]

<> aql> aggregate distinct_with_args.distinct(‘bin2’, ‘bin3’, ‘bin4’, ‘bin1’, 0) on test.demo where bin0 between 0 and 1000 [ { “distinct”: Map(“count1”->334, “count2”->251, “count3”->201), } ]

aql> aggregate distinct_with_args.distinct(‘bin2’, ‘bin3’, ‘bin4’, ‘bin1’, 499) on test.demo where bin0 between 0 and 1000 [ { “distinct”: Map(“count1”->2, “count2”->2, “count3”->2), } ]

aql> aggregate distinct_with_args.distinct(‘bin2’, ‘bin3’, ‘bin4’, ‘bin1’, 500) on test.demo where bin0 between 0 and 1000 [ { “distinct”: Map(“count1”->1, “count2”->1, “count3”->1), } ]


#5

Post by devops01 » Wed May 14, 2014 10:04 am

Here’s an example Lua script to do the queries:

local function apply_whereclause(whereclause_bin, whereclause_binvalue) return function(record) – info("%s value is %d. filter value is %d", whereclause_bin, record[whereclause_bin], whereclause_binvalue); if record[whereclause_bin] >= whereclause_binvalue then – info(“filter pass”) return true else – info(“filter fail”) return false end end end

local function select_bins(bin1, bin2, bin3) return function(record) – Create a map of map. The toplevel map has three elements which – are the selected bins on which we want to run the aggregation on – It also contains the number of elements in the second level maps. – The secondlevel maps maintain the frequency count of each binvalue – (Frequency count is not really needed when trying to do distinct) local map_of_map = map() local bin1_map = map() local bin2_map = map() local bin3_map = map()

  -- Store the frequency of each binvlaue. Initializing it to 1
  -- Also count the number of elements in the second level map
  if record[bin1] then
     bin1_map[record[bin1]] = 1
  end
  if record[bin2] then
     bin2_map[record[bin2]] = 1
  end
  if record[bin2] then
     bin3_map[record[bin3]] = 1
  end

  -- Put the second level maps into the top level map
  map_of_map['bin1_vals'] = bin1_map
  map_of_map['bin2_vals'] = bin2_map
  map_of_map['bin3_vals'] = bin3_map
  map_of_map['bin1_val_count'] = map.size(bin1_map)
  map_of_map['bin2_val_count'] = map.size(bin2_map)
  map_of_map['bin3_val_count'] = map.size(bin3_map)
  -- info("map called")
  return map_of_map

end end

function distinct(s, selected_bin1, selected_bin2, selected_bin3, whereclause_bin, whereclause_binvalue)

– As no addtional args need to be passed to the reducer function – it need not return a function. This is a simple type. – mom = map of map local function aggregate_distinct(mom1, mom2)

  -- info("reduce called")

  -- Define how to merge the map values
  local function map_value_merger(mapval1, mapval2)
     return mapval1 + mapval2
  end

  -- Extract the second level maps
  -- Remember to handle the nil maps especially because 
  -- the first invocation of reduce will come with nil map as one argument
  local bin1_map1 = (mom1['bin1_vals'] or map())
  local bin1_map2 = (mom2['bin1_vals'] or map())
  local bin2_map1 = (mom1['bin2_vals'] or map())
  local bin2_map2 = (mom2['bin2_vals'] or map())
  local bin3_map1 = (mom1['bin3_vals'] or map())
  local bin3_map2 = (mom2['bin3_vals'] or map())

  -- Merge the second level maps. Call the library function to merge maps
  -- return a map which is of similar structure to the input maps
  local resultmap = map()
  resultmap['bin1_vals'] = map.merge(bin1_map1, bin1_map2, map_value_merger)
  resultmap['bin2_vals'] = map.merge(bin2_map1, bin2_map2, map_value_merger)
  resultmap['bin3_vals'] = map.merge(bin3_map1, bin3_map2, map_value_merger)
  resultmap['bin1_val_count'] = map.size(resultmap['bin1_vals'])
  resultmap['bin2_val_count'] = map.size(resultmap['bin2_vals'])
  resultmap['bin3_val_count'] = map.size(resultmap['bin3_vals'])
  return resultmap

end

– If the user does not care about the frequencies of the each element, – this function is used to eliminate them. local function get_count_only(map_of_map) local finalresultmap = map() finalresultmap[‘count1’] = map_of_map[‘bin1_val_count’] finalresultmap[‘count2’] = map_of_map[‘bin2_val_count’] finalresultmap[‘count3’] = map_of_map[‘bin3_val_count’] return finalresultmap end

local myfilter = apply_whereclause(whereclause_bin, whereclause_binvalue) local mymap = select_bins(selected_bin1, selected_bin2, selected_bin3)

return s : filter(myfilter) : map(mymap) : reduce(aggregate_distinct) : map(get_count_only) end