Context-aware Aggregation in LUA

,

I’m fairly new to Aerospike, UDFs, and LUA. However, I can’t get my head around how to solve such an issue where I need to have a context-aware aggregation. Any directions would be amazingly helpful.

Use Case: A click-stream tracking system that allows for the calculation of the minimum, maximum and average visit duration.

Simplified Data Structure:

  • session (String): a unique ID
  • createdAt (Integer): the timestamp when the tracking happened

How I normally would solve this is by the following steps.

  1. Group by the session
  2. Sort the grouped session’s data by createdAt
  3. Calculate the session’s data by comparing the createdAt of the first and the last session.
  4. Merge the results from the grouped by session blocks into one global result.
  5. Calculate the minimum, maximum and average duration

The issue I have right now that I can’t really bring my head around how to implement such a context-aware (session 1 createdAt needs comparison with session 2 createdAt) functionality in a UDF with LUA. Additionally, the grouping of the raw data into session-scopes is giving me a bit of a hard time.

How would you solve such an issue?

Can you clarify the problem a little more in an Aerospike context - i.e. what each record’s data will look like. Add some real examples of records (key - bin values) and their data - say 10 records. Also indicate how many total (maximum expected) records are you trying to find the min, max and avg over, typically.

The mentioned data would look like the following. There are another 20 bins, but they get filtered out even before the data stream goes into the UDF.

{ session: "a5hzu9h35si1561977717229", createdAt: 1569597199672 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597199672 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597194675 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597199672 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569597199676 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597209672 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597239677 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569597279672 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569597569679 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569598199672 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569598299672 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569599299672 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569599399672 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569599400000 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569599410000 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569599420000 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569599430000 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569599440000 }
{ session: "fal2xdcpr7v1569600763994", createdAt: 1569599450000 }
{ session: "a5hzu9h35si1561977717229", createdAt: 1569599460000 }

The database has somewhere around 1.25 billion entries. However, for the described functionality, we only look at 10% of the entries.

So lets see how you would apply your algorithm to the above data. You have session ID and created At. How are your now “sessionizing”? - i.e. how would you calculate step 3 - for the above data – what is the “first” and “last” session? (Is there some kind of minimum time between createdAt for same session ID that is used to separate sessions?) The reason I am asking is perhaps there is a different way to get the result you seek using sorted lists than aggregation UDFs - so I need to understand how large the intermediate list data can grow and how to sessionize? I am thinking of an approach somewhere along the lines in this blog: Sorted Results from a Secondary Index Query — Part II | by Piyush Gupta | Aerospike Developer Blog | Medium

Aerospike is multi-instance cluster, you should think like with merge-sort algorithm. You should not sort it before calculating. Just calculate record like this on each aggregation step:

( session, min_createdAt, max_createdAt)

If record not in temp table, put it with min_createdAt=max_createdAt. If it exists, update with new values.

  min_createdAt=min(min_createdAt, createdAt)
  max_createdAt=max(max_createdAt,createdAt)

After process compleated you will have only one record for each session in temp table.

Next step just calculate average, min, max on that table.

This will cost you like O(N) and not think you can get better algorithm for this task considering grouping is multithreaded and multi-server. If you prefer one-thread solution or have no additional memory/disk - do fetch by index by session and calculate your value in sliding window, like proposed above. My variant should work much better on multi-cpu system.