Stream UDF not working

php
python
stream
udf

#1

I tried UDF function from example:

local function page_words(words, rec)
  info(tostring(rec['val']))
  words[rec['val']] = rec['val']
  return words
end

local function sum_words(word1, words2)
  info("got " .. tostring(word1) .. "||" .. tostring(word2))
  -- use map.merge() to merge two maps
  -- if the same name exists in both maps, then use the function to merge the values.
  return map.merge(word1, words2, math.sum)
end

function word_count(s)
  return s : aggregate(map(), page_words) : reduce(sum_words)
end

Which is working nicely:

aql> aggregate wordcount.word_count() ON test.rand
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| word_count                                                                                                                                                                                                                                                     |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {160:160, 65:65, 2:2, 3:3, 4:4, 197:197, 390:390, 1287:1287, 40:40, 41:41, 42:42, 11:11, 12:12, 45:45, 2062:2062, 1263:1263, 80:80, 81:81, 50:50, 179:179, 8308:8308, 8309:8309, 8310:8310, 1399:1399, 8440:8440, 57:57, 7674:7674, 4123:4123, 4124:4124, 61:6 |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (11.982 secs)

But when i call it from python client I get:

python test_udf.py 
1.0.56
Traceback (most recent call last):
  File "test_udf.py", line 44, in <module>
    query.foreach(process_udf)
exception.UDFError: (100L, 'UDF: Execution Error 1', 'src/main/aerospike/aerospike_query.c', 798)

code:

import aerospike
from aerospike.exception import *
import os, sys

print aerospike.__version__

as_ns = "test"
as_set = "rand"

cli = aerospike.client(
{
   "hosts": [("aeros1.dev", 3000)],
   "lua": {},
   "policies": {
       "timeout": 100000,
    },
  }
)

if __name__ == '__main__':
   a = cli.connect()

   def process_udf(value):
       print value

   query = a.query(as_ns, as_set)
   query.select( 'val' )
   query.apply('wordcount', 'word_count')
   query.foreach(process_udf)

I tried that on python client 1.0.56 and cluster version: aerospike-server-community 3.5.12-1 I also tried that on another cluster with same result.

aerospike log:

Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::871) scan job received
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::922) scan_option 0x0 0x64
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::1013) scan option: Fail if cluster change False
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::1014) scan option: Background Job False
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::1015) scan option: priority is 0 n_threads 3 job_type 1
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::1016) scan option: scan_pct is 100 
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 8546
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 7212
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 8460
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 4606
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 8474
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 6430
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 2211
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 6533
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 9298
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 7161
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 2411
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 4270
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 2201
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 167
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 6935
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 3218
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 5772
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 2566
Nov 23 2015 19:00:30 GMT: INFO (scan): (thr_tscan.c::1459) tid 2108554936941043035: scan send response error returned -1 errno 104 fd 131 
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 4723
Nov 23 2015 19:00:30 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 6856
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::160) SIGSEGV received, aborting Aerospike Community Edition build 3.5.12
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: found 14 frames
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 0: /usr/bin/asd(as_sig_handle_segv+0x54) [0x47864d]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 1: /lib/x86_64-linux-gnu/libc.so.6(+0x321e0) [0x7f7a649481e0]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 2: /usr/bin/asd(cf_buf_builder_reserve+0xc) [0x50a503]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 3: /usr/bin/asd(as_msg_make_val_response_bufbuilder+0x26) [0x4724c1]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 4: /usr/bin/asd(tscan_agg_ostream_write+0x4b) [0x4c1a83]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 5: /usr/bin/asd() [0x52d987]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 6: /usr/bin/asd() [0x54f158]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 7: /usr/bin/asd(lua_pcall+0x30) [0x53ee00]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 8: /usr/bin/asd() [0x527e51]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 9: /usr/bin/asd() [0x528905]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 10: /usr/bin/asd(as_aggr__process+0x280) [0x4562de]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 11: /usr/bin/asd(tscan_partition_thr+0x35e) [0x4c4936]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 12: /lib/x86_64-linux-gnu/libpthread.so.0(+0x6b50) [0x7f7a65762b50]
Nov 23 2015 19:00:30 GMT: WARNING (as): (signal.c::162) stacktrace: frame 13: /lib/x86_64-linux-gnu/libc.so.6(clone+0x6d) [0x7f7a649f195d]

which killed one node.

second scenarion without failing node:

Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::871) scan job received
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::922) scan_option 0x0 0x64
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1013) scan option: Fail if cluster change False
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1014) scan option: Background Job False
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1015) scan option: priority is 0 n_threads 3 job_type 1
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1016) scan option: scan_pct is 100 
Nov 23 2015 19:08:55 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 8546
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1445) tid 1826822057963423753: scan send response error returned -1 errno 32 fd 129 
Nov 23 2015 19:08:55 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 8460
Nov 23 2015 19:08:55 GMT: INFO (aggr): (/opt/aerospike/sys/udf/lua/stream_ops.lua::175) 7212
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::387) SCAN JOB DONE  [id =1826822057963423753: ns= test set=rand scanned=0 expired=0 set_diff=0 elapsed=3 (ms)]
Nov 23 2015 19:08:55 GMT: INFO (scan): (thr_tscan.c::1488) tid 1826822057963423753: no more fh. Probably client closed up connection
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4796)  system memory: free 65156296kb ( 98 percent free ) 
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4804)  migrates in progress ( 0 , 0 ) ::: ClusterSize 4 ::: objects 853960 ::: sub_objects 0
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4812)  rec refs 853960 ::: rec locks 0 ::: trees 0 ::: wr reqs 0 ::: mig tx 0 ::: mig rx 6
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4818)  replica errs :: null 0 non-null 0 ::: sync copy errs :: node 0 :: master 0 
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4828)    trans_in_progress: wr 0 prox 0 wait 0 ::: q 0 ::: bq 0 ::: iq 0 ::: dq 0 : fds - proto (11, 24, 13) : hb (6, 22, 16) : fab (58, 72, 14)
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4830)    heartbeat_received: self 0 : foreign 16016
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4831)    heartbeat_stats: bt 0 bf 15514 nt 0 ni 0 nn 0 nnir 0 nal 0 sf1 0 sf2 0 sf3 0 sf4 0 sf5 0 sf6 0 mrf 0 eh 1 efd 1 efa 0 um 0 mcf 14 rc 16 
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4844)    tree_counts: nsup 0 scan 0 batch 0 dup 0 wprocess 0 migrx 6 migtx 0 ssdr 0 ssdw 0 rw 0
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4860) namespace test: disk inuse: 0 memory inuse: 147420 (bytes) sindex memory inuse: 189910 (bytes) avail pct 100
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4860) namespace scan: disk inuse: 0 memory inuse: 0 (bytes) sindex memory inuse: 18688 (bytes) avail pct 100
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4860) namespace er: disk inuse: 1013765504 memory inuse: 586837047 (bytes) sindex memory inuse: 76515 (bytes) avail pct 91
Nov 23 2015 19:09:00 GMT: INFO (info): (thr_info.c::4905)    partitions: actual 3069 sync 2042 desync 0 zombie 0 wait 0 absent 7177
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: reads (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: writes_master (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: proxy (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: writes_reply (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: udf (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: query (0 total) msec
Nov 23 2015 19:09:00 GMT: INFO (info): (hist.c::137) histogram dump: query_rec_count (0 total) count
Nov 23 2015 19:09:04 GMT: INFO (drv_ssd): (drv_ssd.c::2536) device /www/aerospike/data/er.dat: used 1013765504, contig-free 15017M (15017 wblocks), swb-free 0, n-w 0, w-q 0 w-tot 0 (0.0/s), defrag-q 0 defrag-tot 1 (0.0/s)

I’m quite desperate so I tried that also in another languge with same result:

<?php

require __DIR__ . '/vendor/autoload.php';

$config = [

“hosts” => [ [“addr” => “aeros1.dev”, “port” => 3000] ] ];

$db = new Aerospike($config);

if (!$db->isConnected()) {

echo “Failed to connect to the Aerospike server [{$db->errorno()}]: {$db->error()}\n”; exit(1); }

$status = $db->aggregate("test", "rand", [], "wordcount", "word_count", ["val"], $names);
if ($status == Aerospike::OK) {
    var_dump($names);
} else {
    echo "An error occured while running the AGGREGATE [{$db->errorno()}] ".$db->error();
}

result:

php udf.php 
An error occured while running the AGGREGATE [100] UDF: Execution Error 1

Any ideas what is going on? Thanks in advance.


#2

In both cases, when you’re using any UDF (record or stream), you need to be able to get to the system Lua files. By default those are installed in /usr/local/aerospike/lua. You can configure your client’s constructor with an explicit path to those. In the case of Python it’s the module method aerospike.client - see system_path field of the lua config param. In the PHP client it’s in the config directive aerospike.udf.

Specific to Stream UDFs, those also require being able to reach your local copy of the Lua module, because the final reduce step runs with the client. In the Python client that should be either a local file . relative to the client, or explicitly defined through the user_path of the lua config param. See the example in the documentation for aerospike.Query.apply.

The installer of both tries to create the paths /usr/local/aerospike/lua and /usr/local/aerospike/usr-lua. You should have files in the system path (aerospike.lua, as.lua, stream_ops.lua). I see that you’re providing an empty dict for the lua config param. Try to create a client without that lua config param at all, and then one with explicit paths (as in the example), and let me know if either works (which they should, by default).

Which release of the Python client do you have?


#3

Wow thanks a lot, I did not realize that I have to use local lua script for the reduce phase. My second question to this topic is: is all the data moved over the network to the client? I would like to to process allmost billion keys, which will be big and it will consume a lot of RAM, is there any possibility to process data in chunks?

I use python client verision 1.0.56 compiled from source.

When I provide lua paths explicitely all works great, when I don’t the job fails.

Both paths you mentioned exists, but the second one is empty. I uploaded the script with AQL not with python.

In which system path should those files be? I suppose that in lua, which contains:

$ lua 
Lua 5.2.1  Copyright (C) 1994-2012 Lua.org, PUC-Rio
> print(package.path)
/usr/local/share/lua/5.2/?.lua;/usr/local/share/lua/5.2/?/init.lua;/usr/local/lib/lua/5.2/?.lua;/usr/local/lib/lua/5.2/?/init.lua;./?.lua;/usr/share/lua/5.2/?.lua;/usr/share/lua/5.2/?/init.lua;./?.lua

But all those paths does not exist.


#4

The data comes over in batches, depending on the setting of query-batch-size which by default is 100. You can raise it so that the reducer works on more on the server-side, resulting in fewer batches shipped to the client. You’d configure it this way: asinfo -v “set-config:context=service;query-batch-size=10000”. It costs in more compute time on the server, but the load on the client would drop a lot. Each batch should produce one resulting data type from the reducer, so bigger batches run longer and consume more on the server side, but there’s less to reduce on the client-side. The difference can be significant so tune it as makes sense to you.

The rule for Stream UDFs is to have the query itself reduce as many of the records up-front, so that the smallest set possible is streamed to the UDF module. You do this by placing the predicate that filters out the most records in the query. Then you want all filtering conditions to exist in the filter function, further reducing the number of records that go through. The mapper should drop all the unneeded bins, and then you should consider an accumulator and reducer. As I mentioned previously, the number of records going through the reducer is defined by the query-batch-size.

The reason we need to run a final reducer on the client-side is because this operation happens in parallel across all the nodes of the cluster and they are shared-nothing. There is no move-and-reduce step, as you may be familiar from Hadoop MR.

As long as you have a system_path with the common system Lua files you should be good.

Let me know if you have any other questions.