Find Top N record from stream


#1

Hi all,

How to find the top 10 records from steam? Is it doable on server side so we can minimize the data transfer to client?

Say we have a bunch of records with event and time. I want to find the latest 10 events. I am thinking to use UDF on stream. Here is my current structure.

map() to get {eventid=1, time=1}, {eventid=2, time=2}…

-- Return last n value in list
-- aggregate is a list
local function last_n_aggregation(aggregate, nextitem)
-- bubble sort
local i = 1
while i <= 10 do
    if aggregate[i] == nil then
        aggregate[i] = nextitem
        break
    else
        if aggregate[i].time < nextitem.time then
            aggregate[i], nextitem = nextitem, aggregate[i]
        end
    end
    i = i + 1
end
return aggregate
end

reduce() – merges two lists and return a list.

This method works but the aggregate is not efficient (bubble sort). Is there a way to make binary search possible in aggregate.

List type of aerospike doesn’t have a insert function. Binary search needs to do a lot copy after we find the insert position.


#2

Hi Wu

I worked up an example that will do what you want. You can find it at https://github.com/helipilot50/aerospike-top-10-aggregation.git

I hope this helps

Peter


#3

Hi Peter,

Your work is so impressive. Thank you for your example. I have read and tested your example and I get some followup questions about your example.

  1. When you do a movedown, the move copy is a reference(point) copy or an object copy? I would like to understand a little more about list type. Will that be an issue we need to consider when data grows to larger?

  2. Yesterday, when I was investigating to use an external lib (Loop.collection.PriorityQueue) to find the top N (I am not sure if this is a better solution). I got a problem. When I query in aql tool with AGGREGATE module.function ON ns.set command, I get correct result in aql. When I use java client to do queryAggregate(), I don’t get any response. I am thinking, client may not find the Loop lib. (I believe client finds the topN.lua file). Do you have any experience on external lib usage in aerospike?

  3. I am currently reading more about Large Data Type, Ordered List. I want to see if this could provide an alternative solution and simplify my work. Do you have some suggestions?

Finally, thank you for your time on this discussion. It gives me a lot info and helps me a lot.

Sincerely, Pengfei


#4

Hi Pengfei,

Let me answer your questions

1: Elements in a list are actually pointers, so the movedown() function is only moving pointers.

2: Yes you are right, the Loop.collection.PriorityQueue needs to be available on the client as well as each node in the cluster. The default directory is “udf” but you can change this by setting in Java by

LuaConfig.SourceDirectory = "your udf directory";

Be sure to include your own UDF modules in the directory too.

3: Large (Ordered) Lists are very cool and very new. They can store almost unlimited number of entries (billions of them). They are ordered by a “key” field configured when you first the LList. Aerospike is overhauling the API associated with Large Data Types, so keep watching the web site.

Regards, Peter