UDF stream time series


I’m trying to break down how many interactions happen every minute. I have the following UDF. The issue is the reduce doesn’t seem to merge all the values. I get multiple results with the same keys. I’m unsure if it’s how I wrote this or if the time calculated isn’t accurate.

local function toMinutes(time) return time - (time % 60000000000) end

local function add_values(impMap, nextImp) local time = toMinutes(nextImp[“created”])

    debug("time is %s", time)

    local timeSet = impMap[time]

    if timeSet == null then
            timeSet = map { count = 0 }

timeSet.count = timeSet.count + 1

    impMap[time] = timeSet
    return impMap end

local function merge(a, b) a.count = a.count + b.count return a end

local function reduce_values(a, b) return map.merge(a, b, merge) end

function per_minute(stream) return stream : aggregate(map(), add_values) : reduce(reduce_values) end


I’ve been able to narrow it down to the reduce function not running. I get all my other debug messages but the reduce I’m doign this via AQL’s command line:

AGGREGATE imps-per-min.impressions_per_minute() on adserver.impressions WHERE created BETWEEN 1446611475911403870 AND 1446612075911403870