Top
Hi,
I am having some issues with the timeout while executing the udf,and the efficiency for udf task.
Some detail show as following,I really appreciate for your generous comment.
Aerospike conf
service {
paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically reduced to 1.
proto-fd-max 15000
}
mod-lua {
cache-enabled false
}
logging {
file /ASTest/log/aerospike.log {
context any warning
context udf warning
}
}
network {
service {
address any
port 3000
}
heartbeat {
mode multicast
multicast-group 239.1.99.144
port 9918
# To use unicast-mesh heartbeats, remove the 3 lines above, and see
# aerospike_mesh.conf for alternative.
interval 150
timeout 10
}
fabric {
port 3001
}
info {
port 3003
}
}
namespace test {
replication-factor 1
memory-size 10G
default-ttl 30d # 30 days, use 0 to never expire/evict.
storage-engine memory
}
Data Model
Map<String, Object> trade = new HashMap<>(16);
trade.put("serialNumber", "lsh" + random.nextInt(1000000000));
trade.put("rowKey", UUID.randomUUID().toString());
trade.put("userId", "userId" + random.nextInt(100));
trade.put("userName", "userName" + random.nextInt(100));
trade.put("amount", random.nextInt(5000));
trade.put("hour", random.nextInt(24));
trade.put("tradeTime", AsDateUtil.getRandomPastTime(30*24*60));
trade.put("tradeType", "tradeType" + random.nextInt(10));
trade.put("channelType", 30);
trade.put("merchantNumber", "merchantNumber" + random.nextInt(800));
trade.put("accountNumber", "accountNumber" + random.nextInt(1000));
trade.put("peAccountNumber", "peAccountNumber" + random.nextInt(1000));
UDF
local function statisticsTradeByMerchantId(record)
--get now time in millisSeconds
local maxTime = os.time() * 1000
local millisPerHour = 60 * 60 * 1000
local millisPerDay = 24 * millisPerHour
local merchantNumber = record.merchantNumber
--init map if neccessary
local tradeInfo = map { merchantId = merchantNumber, trade0_5count = 0, tradeCount24Hour = 0,
tradeCount30Days = 0, amount5000Count = 0, amount1000Count = 0 }
local last24Hour = maxTime - millisPerDay
local tradeTime = record.tradeTime
-- trade in 24Hours
if tradeTime >= last24Hour and tradeTime < maxTime then
-- get the hour for tradeTime
local hour = tonumber(os.date('%H', tradeTime))
--count the trade for tradeTime in hour between 0 to 5
if hour < 5 then
tradeInfo.trade0_5count = tradeInfo.trade0_5count + 1
end
--count trade in 24 hours
tradeInfo.tradeCount24Hour = tradeInfo.tradeCount24Hour + 1
local amount = record.amount or 0
print("amount" .. amount)
-- count trade for amount greader than 5000
if amount >= 5000 then
tradeInfo.amount5000Count = tradeInfo.amount5000Count + 1
end
-- a multiple of tade amount
if (math.ceil(amount) == math.floor(amount)) and (math.ceil(amount) % 1000 == 0) then
tradeInfo.amount1000Count = tradeInfo.amount1000Count + 1
end
end
tradeInfo.tradeCount30Days = tradeInfo.tradeCount30Days + 1
return tradeInfo
end
local function mergeMap(current, next)
current.trade0_5count = current.trade0_5count + next.trade0_5count
current.tradeCount24Hour = current.tradeCount24Hour + next.tradeCount24Hour
current.amount5000Count = current.amount5000Count + next.amount5000Count
current.amount1000Count = current.amount1000Count + next.amount1000Count
current.tradeCount30Days = current.tradeCount30Days + next.tradeCount30Days
return current;
end
local function reduceByMerchantId(current, next)
return map.merge(current, next, mergeMap)
end
local function groupByMerchantId(res, item)
local merchantId = item.merchantId
local tradeInfo = res[merchantId] or map { merchantId = merchantId, trade0_5count = 0, tradeCount24Hour = 0,
tradeCount30Days = 0, amount5000Count = 0, amount1000Count = 0 }
local trade0_5count = item.trade0_5count
local tradeCount24Hour = item.tradeCount24Hour
local amount5000Count = item.amount5000Count
local amount1000Count = item.amount1000Count
local tradeCount30Days = item.tradeCount30Days
tradeInfo.trade0_5count = tradeInfo.trade0_5count + trade0_5count
tradeInfo.tradeCount24Hour = tradeInfo.tradeCount24Hour + tradeCount24Hour
tradeInfo.tradeCount30Days = tradeInfo.tradeCount30Days + tradeCount30Days
tradeInfo.amount5000Count = tradeInfo.amount5000Count + amount5000Count
tradeInfo.amount1000Count = tradeInfo.amount1000Count + amount1000Count
res[merchantId] = tradeInfo
return res
end
function ruleW38(stream)
--maxTime = maxTradeTime
--print(maxTradeTime)
return stream:map(statisticsTradeByMerchantId):aggregate(map(), groupByMerchantId):reduce(reduceByMerchantId)
end
Trouble
The data size for test namespace is only 1 million.
But the udf task took 20s,30s and even 50s to execute. Timeout exception throw sometimes,but I had already set the timeout policy 30,000ms . Is this normal? Or is there something wrong with the way I use? Why is it so slow? Isn’t Aerospike characterized by low latency?The test is based on memory, I can not imagine the case for disk storage,or more scale data.