local function min_key_of_map(m)
local min = nil
for key in map.keys(m) do
if (min == nil) or (key < min) then
min = key
end
end
return min
end
-- return value
-- 0 : success
-- -10001 : ts is invalid
-- other : error of aerospike:update or aerospike:create
local function set_freq(topRecord,ts)
-- check ts
if ts < 10000 then
return -10001
end
-- compute date
local date = os.date('*t', ts) -- make sure server' localtime is set to Asia/Shanghai
local mon = date.year * 100 + date.month -- year and month
local week = date.year * 100 + (date.yday + 6)/7 -- year and week
local day = mon * 100 + date.day -- year, month and day
-- update or create record
if aerospike:exists(topRecord) then -- update exist record
-- update total
topRecord['total'] = (topRecord['total'] or 0) + 1
-- update month data
local mon_data = topRecord['mon']
mon_data[mon] = (mon_data[mon] or 0) + 1
-- remove oldest month data if need
if (map.size(mon_data) > 2) then
map.remove(mon_data, min_key_of_map(mon_data))
end
topRecord['mon'] = mon_data
-- update week data
local week_data = topRecord['week']
week_data[week] = (week_data[week] or 0) + 1
-- remove oldest week data if need
if (map.size(week_data) > 2) then
map.remove(week_data, min_key_of_map(week_data))
end
topRecord['week'] = week_data
-- update day data
local day_data = topRecord['day']
local l = day_data[day] or list{0, 0,0,0,0,0,0, 0,0,0,0,0,0, 0,0,0,0,0,0, 0,0,0,0,0,0} -- 25 datas (total + 24 hours )
l[1] = l[1] + 1 -- day's total count
local index = date.hour + 2
l[index] = l[index] + 1
day_data[day] = l
-- remove oldest day data if need
if (map.size(day_data) > 14) then
map.remove(day_data, min_key_of_map(day_data))
end
topRecord['day'] = day_data
-- commit all updates
return aerospike:update(topRecord)
else -- create new record
-- set total
topRecord['total'] = 1
-- set mon data
local mon_data = map.new(1)
mon_data[mon] = 1
topRecord['mon'] = mon_data
-- set week data
local week_data = map.new(1)
week_data[week] = 1
topRecord['week'] = week_data
-- set day data
local day_data = map.new(1)
local l = list{1, 0,0,0,0,0,0, 0,0,0,0,0,0, 0,0,0,0,0,0, 0,0,0,0,0,0} -- 25 datas (total + 24 hours )
local index = date.hour + 2
l[index] = 1
day_data[day] = l
topRecord['day'] = day_data
-- commit settings
return aerospike:create(topRecord)
end
end
-- return value same as function set_freq(...)
function set_freq_solution(topRecord,cookie,sid,ts)
topRecord['ts'] = ts
topRecord['cookie'] = cookie
topRecord['sid'] = tostring(sid)
return set_freq(topRecord,ts)
end
local function get_filter_test(condition,id_name)
return function (r)
return true
end
end
local function get_map(id_name)
return function (rec)
return list{rec[id_name]}
end
end
local function my_reduce(l1,l2)
list.concat(l1,l2)
return l1
end
function getSolution(stream,condition)
local id_name = 'sid'
local my_filter = get_filter_test(condition,id_name)
local my_map = get_map(id_name)
return stream:filter(my_filter):map(my_map):reduce(my_reduce)
end
create index s_index_cooke_s on camel.freq_s(cookie) STRING
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10000,1425535188) ON camel.freq_s WHERE PK = '0'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10001,1425535188) ON camel.freq_s WHERE PK = '1'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10002,1425535188) ON camel.freq_s WHERE PK = '2'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10003,1425535188) ON camel.freq_s WHERE PK = '3'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10004,1425535188) ON camel.freq_s WHERE PK = '4'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10005,1425535188) ON camel.freq_s WHERE PK = '5'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10006,1425535188) ON camel.freq_s WHERE PK = '6'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10007,1425535188) ON camel.freq_s WHERE PK = '7'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10008,1425535188) ON camel.freq_s WHERE PK = '8'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10009,1425535188) ON camel.freq_s WHERE PK = '9'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10010,1425535188) ON camel.freq_s WHERE PK = '10'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10011,1425535188) ON camel.freq_s WHERE PK = '11'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10012,1425535188) ON camel.freq_s WHERE PK = '12'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10013,1425535188) ON camel.freq_s WHERE PK = '13'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10014,1425535188) ON camel.freq_s WHERE PK = '14'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10015,1425535188) ON camel.freq_s WHERE PK = '15'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10016,1425535188) ON camel.freq_s WHERE PK = '16'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10017,1425535188) ON camel.freq_s WHERE PK = '17'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10018,1425535188) ON camel.freq_s WHERE PK = '18'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10019,1425535188) ON camel.freq_s WHERE PK = '19'
EXECUTE dsp_freq.set_freq_solution('cookie_freq_001',10020,1425535188) ON camel.freq_s WHERE PK = '20'
- Map reduce (aerospike server will crash)
AGGREGATE dsp_freq.getSolution('JSON{}') ON camel.freq_s WHERE cookie = 'cookie_freq_001'
- The server is in memory-only mode. Use default setting.
- If records count is small (< 7), the server not crash
- If use “insert into” command (not udf) to
insert records(100 lines), then map reduce, the server not crash.