Low efficiency and timeout exception of udf for java client

Top

Hi,

I am having some issues with the timeout while executing the udf,and the efficiency for udf task.

Some detail show as following,I really appreciate for your generous comment.

Aerospike conf

service {
	paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically reduced to 1.
	proto-fd-max 15000
}
mod-lua {
  cache-enabled false
}
logging {

        file /ASTest/log/aerospike.log {
           context any warning
           context udf warning
        }

}
network {
	service {
		address any
		port 3000
	}
	heartbeat {
		mode multicast
		multicast-group 239.1.99.144
		port 9918

		# To use unicast-mesh heartbeats, remove the 3 lines above, and see
		# aerospike_mesh.conf for alternative.

		interval 150
		timeout 10
	}
	fabric {
		port 3001
	}
	info {
		port 3003
	}
}
namespace test {
	replication-factor 1
	memory-size 10G
	default-ttl 30d # 30 days, use 0 to never expire/evict.
	storage-engine memory
}

Data Model

Map<String, Object> trade = new HashMap<>(16);
trade.put("serialNumber", "lsh" + random.nextInt(1000000000));
trade.put("rowKey", UUID.randomUUID().toString());
trade.put("userId", "userId" + random.nextInt(100));
trade.put("userName", "userName" + random.nextInt(100));
trade.put("amount", random.nextInt(5000));
trade.put("hour", random.nextInt(24));
trade.put("tradeTime", AsDateUtil.getRandomPastTime(30*24*60));
trade.put("tradeType", "tradeType" + random.nextInt(10));
trade.put("channelType", 30);
trade.put("merchantNumber", "merchantNumber" + random.nextInt(800));
trade.put("accountNumber", "accountNumber" + random.nextInt(1000));
trade.put("peAccountNumber", "peAccountNumber" + random.nextInt(1000));

UDF

local function statisticsTradeByMerchantId(record)
    --get now time in millisSeconds
    local maxTime = os.time() * 1000
    local millisPerHour = 60 * 60 * 1000
    local millisPerDay = 24 * millisPerHour
    local merchantNumber = record.merchantNumber
    --init map if neccessary
    local tradeInfo = map { merchantId = merchantNumber, trade0_5count = 0, tradeCount24Hour = 0,
                            tradeCount30Days = 0, amount5000Count = 0, amount1000Count = 0 }
    local last24Hour = maxTime - millisPerDay
    local tradeTime = record.tradeTime
    -- trade in 24Hours
    if tradeTime >= last24Hour and tradeTime < maxTime then
        -- get the hour for tradeTime
        local hour = tonumber(os.date('%H', tradeTime))

        --count the trade for tradeTime in hour between 0 to 5
        if hour < 5 then
            tradeInfo.trade0_5count = tradeInfo.trade0_5count + 1
        end
        --count trade in 24 hours
        tradeInfo.tradeCount24Hour = tradeInfo.tradeCount24Hour + 1
        local amount = record.amount or 0
        print("amount" .. amount)
        -- count trade for amount greader than 5000
        if amount >= 5000 then
            tradeInfo.amount5000Count = tradeInfo.amount5000Count + 1
        end
        -- a multiple of tade amount
        if (math.ceil(amount) == math.floor(amount)) and (math.ceil(amount) % 1000 == 0) then
            tradeInfo.amount1000Count = tradeInfo.amount1000Count + 1
        end
    end
    tradeInfo.tradeCount30Days = tradeInfo.tradeCount30Days + 1
    return tradeInfo
end

local function mergeMap(current, next)
    current.trade0_5count = current.trade0_5count + next.trade0_5count
    current.tradeCount24Hour = current.tradeCount24Hour + next.tradeCount24Hour
    current.amount5000Count = current.amount5000Count + next.amount5000Count
    current.amount1000Count = current.amount1000Count + next.amount1000Count
    current.tradeCount30Days = current.tradeCount30Days + next.tradeCount30Days
    return current;
end

local function reduceByMerchantId(current, next)
    return map.merge(current, next, mergeMap)
end

local function groupByMerchantId(res, item)
    local merchantId = item.merchantId
    local tradeInfo = res[merchantId] or map { merchantId = merchantId, trade0_5count = 0, tradeCount24Hour = 0,
                                               tradeCount30Days = 0, amount5000Count = 0, amount1000Count = 0 }

    local trade0_5count = item.trade0_5count
    local tradeCount24Hour = item.tradeCount24Hour
    local amount5000Count = item.amount5000Count
    local amount1000Count = item.amount1000Count
    local tradeCount30Days = item.tradeCount30Days
    tradeInfo.trade0_5count = tradeInfo.trade0_5count + trade0_5count
    tradeInfo.tradeCount24Hour = tradeInfo.tradeCount24Hour + tradeCount24Hour
    tradeInfo.tradeCount30Days = tradeInfo.tradeCount30Days + tradeCount30Days
    tradeInfo.amount5000Count = tradeInfo.amount5000Count + amount5000Count
    tradeInfo.amount1000Count = tradeInfo.amount1000Count + amount1000Count
    res[merchantId] = tradeInfo
    return res
end
function ruleW38(stream)
    --maxTime = maxTradeTime
    --print(maxTradeTime)
    return stream:map(statisticsTradeByMerchantId):aggregate(map(), groupByMerchantId):reduce(reduceByMerchantId)
end

Trouble

The data size for test namespace is only 1 million.

But the udf task took 20s,30s and even 50s to execute. Timeout exception throw sometimes,but I had already set the timeout policy 30,000ms . Is this normal? Or is there something wrong with the way I use? Why is it so slow? Isn’t Aerospike characterized by low latency?The test is based on memory, I can not imagine the case for disk storage,or more scale data.

How is the UDF being invoked? Are you scanning the entire namespace, or only a subset of the namespace by using a set name? What are your udf/scan configs set to? Are your systems doing OK on resource utilization while it is running? What version of the server is this?

Appreciate for your generous reply.

well, the detail for your confusion is following:

How to Invoke


public void ruleW38() {
    registerAggUDF("/lua", "ruleW38.lua");
    long start = System.currentTimeMillis();
    ResultSet resultSet = queryAggUDF("tradeTime", 1577344771000L, 1577351971000L, "ruleW38", "ruleW38");
    while (resultSet.next()) {
        Object object = resultSet.getObject();
        System.out.println(object);
    }
    System.out.println("cost:" + (System.currentTimeMillis() - start) + "ms");
}

public void registerAggUDF(String clientPath, String serverPath) {
    RegisterTask register = client.register(null, clientPath + File.separator + serverPath, serverPath, Language.LUA);
    LuaConfig.SourceDirectory = clientPath;
    register.waitTillComplete(100, 1000);
}

public ResultSet queryAggUDF(String filterBinName, long filterParamBegin, long filterParamEnd, String packageName, String functionName, Value... values) {
    Statement statement = new Statement();
    statement.setNamespace(namespace);
    statement.setSetName(setName);
    statement.setFilter(Filter.range(filterBinName, filterParamBegin, filterParamEnd));
    try (RecordSet query = client.query(null, statement)) {
        int i = 0;
        while (query.next()) {
            i++;
        }
        System.out.println("data count:" + i);
    }
    LuaConfig.SourceDirectory = "/lua";
    QueryPolicy queryPolicy = new QueryPolicy();
    queryPolicy.totalTimeout=300000;
    queryPolicy.socketTimeout=300000;
    return client.queryAggregate(queryPolicy, statement, packageName, functionName, values);
}

Scan means

As mentioned above,all data is scanned by a setname

About the resouces

All the resource usage is ok, the average CPU usage is about 10%, the peak is 30%, and the memory is enough.

The version for server

the command asd --version show :

Aerospike Community Edition build 4.5.1.5

Is the set you are scanning the only set in the namespace? If not, how many records are in the namespace vs set? The reason I ask is that if you have a very large namespace, and want to scan a very small set in that namespace - the way Aerospike works it happens to make it a bit slow. This is because it still has to go through every entry within the namespace. It is fast to do it, but if you have billions of records in a namespace, but only a few hundred in a set - the scan does a lot more work then it needs to.

If you’re not strained on resources on your cluster, you could try giving more resources to the scan. Try increasing scan-max-udf-transactions and scan-threads; https://www.aerospike.com/docs/reference/configuration/index.html#scan-max-udf-transactions and https://www.aerospike.com/docs/reference/configuration/index.html#scan-threads

My As is Community version,only 2 namespace available.the namespace test has 14 setname . The data size for trade which I scanned is only 1 million.I do not think it’s a big scale data.But the udf is so inefficient that it’s unacceptable for me and slower than redis.So I’m confused about whether there’s something wrong with the way I use it

Scans without indexes have to crawl through the entire namespace. Did you try adjusting threads/max-udf-transactions?

I haven’t ajusted the params yet.However, my query is filtered based on a secondary index tradeTime. Because the aggregation can only take one secondary index,I set one only filter for the statement .I’m only investigating alternatives for redis.However, as far as I’m concerned, the Aerospike doesn’t seem to be the answer to my current problems. Maybe I shouldn’t change the architecture.

If using it as a map-reduce is your main functionality then yeah Aerospike might not be the best tool for the job. Are you sure the secondary index is getting used? Does aql -c 'show scans' show it being a scan? Or does it actually show up under aql -c 'show queries'? If its not using the secondary index, and you have one, you want to make sure its getting used.

I was assuming you were using scans before. Query tuning can be found here https://www.aerospike.com/docs/operations/manage/queries/index.html

No matter how many times the udfs I execute,both the commands aql -c ‘show scans’ and aql -c 'show queries' change nothing, I’ve checked all the nodes

wow,I found it .It can be visible only while runing.But It show in the show queries