If the aggregate()
operation is the first operation in the stream, then it will only be performed on the server side. The reduce()
operation triggers the transition from server to client, performing the reduction first on server then on client. You will need a reduce()
to have the client application perform the final step.
I recommend performing the aggregate()
operation to collect the word counts for each page. The following is the function used to perform the aggregation. We assume the record is composed of two bins: page number and page content. We will use the page number as the key for the page, and process the page content to get a word count for the page.
I am not sure if you want to retain the per page count or simply want to get the average. So I will provide an example of both.
Both examples make use of a a function countWords(map, string)
(not provided), which will count the words in the string and populate the map. If the word already exists in the map, then it increments it appropriately.
Get both Per Page Count and Per Book Average
To count the words per page, then get the average count in the book, while retaining both, you can do the following.
First, the function to aggregate the per page word count.
local function pageWordCount(m, rec)
local page = rec['page']
local content = rec['content']
m[page] = countWords(map(), content)
return m
end
The result will be something like the following (as JSON):
{
"1": {
"and": 10,
"he": 4,
"will": 2,
"rock": 1
},
"2": {
"or": 10,
"she": 4,
"may": 2,
"roll": 1
}
...
}
Next, you will then merge the results from each node in the cluster. Because each record is a page, then there will not be any conflicts in merging, so we can use map.merge()
function directly.
return stream : aggregate(map(), pageWordCount) : reduce(map.merge)
You can then transform the results and get an average count in the book as follows:
local function bookWordAverage(pages)
local wordCounts = map()
local pageCount = map.size(pages)
for pageWords in map.values(pages) do
-- map.merge() takes a third argument which is a
-- function to resolve the value in
-- case of conflict. For this example, we use the
-- `math.sum()` to add the values.
wordCounts = map.merge(wordCounts, pageWords, math.sum)
end
for word, count in map.pairs(wordCounts)
words[word] = count / pageCount
end
-- return both maps
return map{
["words"] = words,
["pages"] = pages
}
end
Make sure you add the map()
operation:
return stream : aggregate(map(), pageWordCount) : reduce(map.merge) : map(bookWordAverage)
Get only Per Book Average
This is much simpler, as you can just count then occurrences of words in the entire book and calculate the average in the reduce()
.
Rather than grouping by page, you can just return the results of countWords
and merge it into m
. We will need to retian a page count, so we can calculate the averages.
local function bookWordCount(m, rec)
local content = rec['content']
countWords(m, content)
-- special field for page count
m["##pages"] = (m["##pages"] or 0) + 1
return m
end
The reduce()
operation will remain the same, so the following will produce the total count of words in the book:
return stream : aggregate(map(), bookWordCount) : reduce(map.merge)
Now you have all the words counted and the page count. So, you need to transform this into averages:
local function bookWordAverage(words)
local pageCount = words["##pages"]
for word, count in map.pairs(words)
words[word] = count / pageCount
end
return words
end
Make sure you add the map()
operation:
return stream : aggregate(map(), bookWordCount) : reduce(map.merge) : map(bookWordAverage)