How to perform multiple aggregation steps


#1

Here is a basic description of what I’m trying to do. This may just be misunderstanding on my part and the way to do it is via one aggregation and one reduction.

Let’s say you want to get an average word count per page in a book, over all the pages of the book. As I understand it, the way to do this would be to aggregate over each page to get the average per page, then aggregate over all the pages, then reduce across nodes. However, between the first and second aggregation the key changes, or at least it seems to me that you’d have to generate a new map array with new keys. For example:

Per page aggregation: create a map array with the key to the array taking the form word|page, then a single value in the map, i.e. {Count = n}. So the array might look like:

array[and|1] = { Count = 25}
array[the|1] = { Count = 34}
array[so|1] = { Count = 123}
array[and|2] = { Count = 23}
array[this|2] = { Count = 12}

and so on.

Now, to aggregate over the whole book we’d need to re-key and create a new array, like this:

newarray[and] = { AveragePerPage = 24}
newarray[the] = { AveragePerPage = 37}
newarray[so] = { AveragePerPage = 24}
newarray[this] = { AveragePerPage = 35}

and so on.

What would be the best way to accomplish this? My understanding is that the second variable in an aggregate function call is a “record”. If I do a second aggregation, can I assume that the stream will now be in the form of the map array output from the first aggregation step? Or, would it be best to perform the second aggregation in the reduce function instead? At some point I can forsee wanting to do more than two aggregations in a single function, assuming that’s possible.

Thanks!


#2

If the aggregate() operation is the first operation in the stream, then it will only be performed on the server side. The reduce() operation triggers the transition from server to client, performing the reduction first on server then on client. You will need a reduce() to have the client application perform the final step.

I recommend performing the aggregate() operation to collect the word counts for each page. The following is the function used to perform the aggregation. We assume the record is composed of two bins: page number and page content. We will use the page number as the key for the page, and process the page content to get a word count for the page.

I am not sure if you want to retain the per page count or simply want to get the average. So I will provide an example of both.

Both examples make use of a a function countWords(map, string) (not provided), which will count the words in the string and populate the map. If the word already exists in the map, then it increments it appropriately.

Get both Per Page Count and Per Book Average

To count the words per page, then get the average count in the book, while retaining both, you can do the following.

First, the function to aggregate the per page word count.

local function pageWordCount(m, rec)
   local page    = rec['page'] 
   local content = rec['content']
   
   m[page] = countWords(map(), content)
   
   return m
end

The result will be something like the following (as JSON):

{
   "1": {
      "and": 10,
      "he": 4,
      "will": 2,
      "rock": 1
   },
   "2": {
      "or": 10,
      "she": 4,
      "may": 2,
      "roll": 1
   }
   ...
}

Next, you will then merge the results from each node in the cluster. Because each record is a page, then there will not be any conflicts in merging, so we can use map.merge() function directly.

return stream : aggregate(map(), pageWordCount) : reduce(map.merge)

You can then transform the results and get an average count in the book as follows:

local function bookWordAverage(pages)
   local wordCounts = map()
   local pageCount = map.size(pages)

   for pageWords in map.values(pages) do
      -- map.merge() takes a third argument which is a 
      -- function to resolve the value in
      -- case of conflict. For this example, we use the 
      -- `math.sum()` to add the values. 
      wordCounts = map.merge(wordCounts, pageWords, math.sum)
   end

   for word, count in map.pairs(wordCounts)
      words[word] = count / pageCount
   end

  -- return both maps
  return map{
    ["words"] = words,
    ["pages"] = pages
  }
end

Make sure you add the map() operation:

return stream : aggregate(map(), pageWordCount) : reduce(map.merge) : map(bookWordAverage)

Get only Per Book Average

This is much simpler, as you can just count then occurrences of words in the entire book and calculate the average in the reduce().

Rather than grouping by page, you can just return the results of countWords and merge it into m. We will need to retian a page count, so we can calculate the averages.

local function bookWordCount(m, rec)
   local content = rec['content']
   countWords(m, content)
   
   -- special field for page count
   m["##pages"] = (m["##pages"] or 0) + 1

  return m
end

The reduce() operation will remain the same, so the following will produce the total count of words in the book:

return stream : aggregate(map(), bookWordCount) : reduce(map.merge)

Now you have all the words counted and the page count. So, you need to transform this into averages:

local function bookWordAverage(words)
   local pageCount = words["##pages"]

   for word, count in map.pairs(words)
      words[word] = count / pageCount
   end

  return words
end

Make sure you add the map() operation:

return stream : aggregate(map(), bookWordCount) : reduce(map.merge) : map(bookWordAverage)

#3

Thanks for your response, this is helpful. The main issue I’m trying to avoid is sending too much data back to the client, so if I could aggregate down another level on the cluster that would be more useful. Is it technically possible to do something like:

stream: filter() : map() : aggregate() : aggregate()

or can I only do one aggregation step server-side? In my case, I’m still left with millions of rows after the first aggregation and I’d like, if possible, to use the cluster resources to narrow the results further. Thanks!

Edit: I feel that I should provide a bit more context. I’ve found that using the Lua interpreter in IIS (specifically in a WCF service) doesn’t work very well so I’d rather do the reduce step in C#. In either case, though, it would be nice to use the cluster resources, if possible, to more efficiently distribute the load.