Poor streaming UDF performance

New to Aerospike from MongoDB and found hundreds times slower using streaming UDF than query with WHERE clause.

I’d like to do custom query on a set of ~2700 Book records (~30 bins) with OwnerID_Hex=“57120d7151c643ab42a8c19c” and IsUserDeleted=0. The set size in memory is less than 3 MB.

With AQL query syntax the it shows 13 rows in 0.002 secs, which is reasonable. Although IsUserDeleted is not assigned.

aql> SELECT Name, OwnerID_Hex FROM PV.Book WHERE OwnerID_Hex="57120d7151c643ab42a8c19c"

But with streaming UDF it returned in 0.863 secs, which is hundreds slower than query.

aql> AGGREGATE pv.getBooksByOwner() ON PV.Book

My streaming UDF:

function getBooksByOwner(stream)
    local function fnFilter(rec)
        if rec["OwnerID_Hex"] == "57120d7151c643ab42a8c19c" and rec["IsUserDeleted"] ~= 1  then
            return true
        return false

    local function fnMap(rec)
        local m = map()
        m["Name"] = rec["Name"]
        m["IsUserDeleted"] = rec["IsUserDeleted"]

        return m

    return stream : filter(fnFilter) : map(fnMap)

Is it a normal performance for a streaming UDF? Or do I have to use aggregation with WHERE clause to narrow down the number of records before sending into UDF?

aql> AGGREGATE pv.getBooksByOwner() ON PV.Book WHERE OwnerID_Hex="57120d7151c643ab42a8c19c" (0.003 secs, reasonable)


Run locally in Vagrant, MacPro 4G RAM


mod-lua {
  user-path /opt/aerospike/usr/udf/lua
  cache-enabled true

namespace PV {
  memory-size 1G
  storage-engine memory

aql> show sets

"disable-eviction": "false",
"ns": "PV",
"set-enable-xdr": "use-default",
"objects": "2721",
"stop-writes-count": "0",
"set": "Book",
"memory_data_bytes": "2875841",
"truncate_lut": "0",
"tombstones": "0"

aql> show indexes

"ns": "PV",
"bin": "OwnerID_Hex",
"indextype": "NONE",
"set": "Book",
"state": "RW",
"indexname": "idxBookOwner",
"path": "OwnerID_Hex",
"type": "STRING"

Yes - Stream UDF are used to run aggregations on results of a secondary index query on the server nodes and final reduce on client node instead of a doing a plain SI query and then doing all the aggregation work in the client.

Also your UDF as written above is working because you have only one node. For multi-node cluster, you need a final reduce() step also where you will merge all the map results to aggregate results from each individual nodes in the client node.

Your streaming UDF uses Lua, and that in itself adds to the latency. Every record picked up by the query needs to be run through one or more Lua functions.

Since all you’re doing is applying a predicate and then picking bins from the matched records, you should be using predicate filtering instead.

Predicate filtering was added in release 3.12. You can use the PredExp class of the Java client to implement the equivalent. Predicate filters also currently exists for the C, C# and Go clients.

This example expresses the same as your stream UDF:

	Statement stmt = new Statement();
	stmt.setBinNames("Name", "IsUserDeleted");
	stmt.setFilter(Filter.equal("OwnerID_Hex", "57120d7151c643ab42a8c19c"));

If you’re using a language client that doesn’t yet support predicate filtering, you’d still have to use a stream UDF to express the extra predicates, as you’ve done.

@rbotzer Thank you. Predicate filtering works well with Golang client, and postfix notation should be taken care when using predicate filtering.

I found using stm.SetPredExp() with multiple expression ANDed together can avoid using stm.Addfilter(), which let the code look more consistent. Do they have difference in terms of performance?

@pgupta Also thank you for the reminding on multi-nodes!

This topic was automatically closed 6 days after the last reply. New replies are no longer allowed.