Unable to get simple filter/map to work using Python client (AER-4251) [Released]


#1

I have the following very simple UDFs and Python code:

udfs.lua:

local getDailyFilterFn(slug, year, month, day)
  local function filter(rec)
    if rec['slug'] == slug and rec['year'] == year and rec['month'] == month and rec['day'] == day then
      return true
    else
      return false
    end
end

local function getRecordKey(rec)
  return record.digest(rec)
end

function getDailySummary(s, slug, year, month, day)
  return s:filter(getDailyFilterFn(slug, year, month, day)):map(getRecordKey)
end

stream_test.py

import aerospike
from aerospike import predicate as p

config = {
    'lua': {
        'system_path': '/home/vagrant/.virtualenvs/aerodb/aerospike/lua/',
        'user_path': '/home/vagrant/aerodb/statistics/faster/udfs/'},
    'hosts': [('192.168.100.14', 3000)]
}

client = aerospike.client(config)
connection = client.connect()
q = connection.query('test', 'user_activities')
q.where(p.equals('userid', 1))
q.apply('udfs', 'getDailySummary', ['test', 2015, 8, 20])
for item in q.results():
    print item

Whenever I run this code I am getting the following:

Traceback (most recent call last):
  File "/home/vagrant/aerodb/stream_test.py", line 16, in __main__
    for item in q.results():
UDFError: (100L, 'UDF: Execution Error 1', 'src/main/aerospike/aerospike_query.c', 704)

I have searched this forum but I cannot find anything that seems to match my case. I have double checked that my udfs.lua is registered with the server. I have double-checked that the system_path and user_path in my client config above is correct. I am at a loss as to what is going on as the error message is very cryptic and does not really seem to point to where the problem is. The server logs have the following:

Aug 17 2015 08:23:43 GMT: DETAIL (query): (aggr.c:as_aggr_istream_read:414) No More Nodes for this Lua Call
Aug 17 2015 08:23:43 GMT: DEBUG (query): (aggr.c:as_aggr__process:245)  Apply Stream with mas AccessLog_getDailySummary 0x7ff8befbec40 0x7ff8befbec00 0x7ff8befbec20 ret=0
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__generator:2111) All the Data finished; All tree finished 0 10
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__add_fin:963) Adding fin 0x7ff938800008
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query_netio_finish_cb:1052) Finished sequence number 0x7ff938800008->1
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_qtr__release:814) Free qtr ref count is zero
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__update_stats:397) Total time elapsed 3365 us, 1 of 6 read operations avg latency 3365 us
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__transaction_done:763) Cleaned up qtr 0x7ff938800008
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_qtr__release:822) Released qtr [base/thr_query.c:1069] 0x7ff938800008 1
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query_netio_finish_cb:1074) Finished query with retCode 0
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__netio:1125) Streamed Out
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_query__bb_poolrelease:598) Pushed 0x7ff933c06000 131072 131072
Aug 17 2015 08:23:43 GMT: DETAIL (query): (thr_query.c:as_qtr__release:822) Released qtr [base/thr_query.c:2124] 0x7ff938800008 1
Aug 17 2015 08:23:43 GMT: DETAIL (query): (aggr.c:as_aggr_call_init:291) not a aggregation 1
Aug 17 2015 08:23:43 GMT: DETAIL (query): (aggr.c:as_aggr_call_init:291) not a aggregation 1

But above that I see these:

Aug 17 2015 08:23:43 GMT: DEBUG (udf): (udf_rw.c:as_val_tobuf:1500) SUCCESS: VAL TYPE UNDEFINED 9

Aug 17 2015 08:23:43 GMT: WARNING (proto): (thr_query.c::869) particle to buf: could not copy data!
Aug 17 2015 08:23:43 GMT: DEBUG (udf): (udf_rw.c:as_val_tobuf:1500) SUCCESS: VAL TYPE UNDEFINED 9

Aug 17 2015 08:23:43 GMT: WARNING (proto): (proto.c::1170) particle to buf: could not copy data!

I am not sure if that’s relevant to the error I am experiencing client side. But digging into the source code for Aerospike it seems “9” is AS_BYTES and as far as documentation for stream and record UDF goes I am returning a valid type in my map function.

Does someone have any idea what I am doing wrong here and how to fix it? Thanks in advance.


Query: how to amend records in aggregateUDF [Resolved]
#2

This maps to the following code in the server. It seems to be first complaining about this not being a valid stream UDF.

First, the file udfs.lua is not valid lua. If you install Lua locally you can use it to check for obvious errors by running it: lua udfs.lua. I saw a series of errors up front.

Here’s a modified version of the Lua code which fixes the getDailyFilterFn closure. It also adds type checking but if you’re sure of the correctness of the data you can remove lines 4-5:

local function getDailyFilterFn(slug, year, month, day)
  return function(rec)
    if rec['slug'] and rec['year'] and rec['month'] and rec['day'] and
       type(rec['slug']) == 'string' and type(rec['year']) == 'number' and
       type(rec['month']) == 'number' and type(rec['day']) == 'number' and
       rec['slug'] == slug and rec['year'] == year and
       rec['month'] == month and rec['day'] == day then
      return true
    else
      return false
    end
  end
end

local function getRecordKey(rec)
  return record.digest(rec)
end

function getDailySummary(s, slug, year, month, day)
  return s:filter(getDailyFilterFn(slug, year, month, day)):map(getRecordKey)
end

If you’re still seeing an error code 100 and log messages complaining about AS_BYTES, please try the following workaround: change getRecordKey to wrap the record digest in a list:

local function getRecordKey(rec)
  local l = list {record.digest(rec)}
  return l
end

On the Python side you’d extract the first element of the list:

for item in q.results():
    print item[0]

Let me know if you needed the workaround, and I’ll open a bug for it.


#3

Thanks for the reply @rbotzer. Regarding the errors in the udfs.lua I typed in the first few lines here in the reply window and then copy/pasted the rest from my terminal window. It is actually a fragment of code from my project which I am writing a test suite for. The Lua code is recent code that has been added to the existing production Lua module and the Python code is part of the test suite for that code.

I have rewritten the udfs.lua to fix the getDailyFilterFn function and also stream_test.py to fix the import line. I can get it to work now, but I need to use the list{record.digest(rec)} otherwise I get an empty bytearray and the AS_BYTES error in the aerospike.log.

lua_test.lua:

local function getDailyFilterFn(slug, year, month, day)
  return function(rec)
    if rec['slug'] and rec['year'] and rec['month'] and rec['day'] and
       type(rec['slug']) == 'string' and type(rec['year']) == 'number' and
       type(rec['month']) == 'number' and type(rec['day']) == 'number' and
       rec['slug'] == slug and rec['year'] == year and
       rec['month'] == month and rec['day'] == day then
      return true
    else
      return false
    end
  end
end

local function getRecordKey(rec)
  return list{record.digest(rec)}
end

function getDailySummary(s, slug, year, month, day)
  return s:filter(getDailyFilterFn(slug, year, month, day)):map(getRecordKey)
end

lua_test.py:

import aerospike
import os

from aerospike import predicates as p
from aerospike.exception import UDFError

config = {
    'hosts': [('192.168.100.14', 3000)]
}


client = aerospike.client(config)
connection = client.connect()

try:
    connection.udf_remove('lua_test.lua')
except UDFError:
    pass

connection.udf_put(os.path.join(os.path.dirname(__file__),
                                'lua_test.lua'))
q = connection.query('test', 'hourly_users')
q.where(p.equals('userid', 1))
q.apply('lua_test', 'getDailySummary', ['test', 2015, 8, 20])
for item in q.results():
        print repr(item)

However, when I use the same code in my test suite and in the production Lua module of the project, I am still getting the 100L exception in the q.results(). My test suite module is in a one directory deep (statistics.tests) and the production Lua module is under statistics/faster/udfs/udfs.lua and I am not sure if that has any bearing with the thing not working from within the test suite. The test suite inserts the necessary records and then runs the test code and I can confirm that there are indeed created records using the command line aerospike client in the vagrant shell.

In my project I have aerospike config as a Python dict with:

config = {
    'lua': {
        'system_path': '/home/vagrant/.virtualenvs/aerodb/aerospike/lua/',
        'user_path': '/home/vagrant/aerodb/statistics/faster/udfs/'},
    'hosts': [('192.168.100.14', 3000)]
}

Curiously, when I remove the config item lua from the lua_test.py and then put lua_test.lua in a subdirectory, for example subdir/lua_test.lua, and change the path in connection.udf_put() above, whenever I run lua_test.py there is a copy of lua_test.lua in my current directory. Also removing the lua section in the config does not seem to have any effect whatsoever on the results of the test, I still get the exception regardless of whether it is present or not.


#4

I have finally figured out what was causing the 100L error. As I mention, I can run the code in lua_test.lua and lua_test.py just fine. However when I integrate the code in lua_test.lua to my production Lua module, the error happens. Well it turns out the culprit is this line in my production Lua module:

local llist_lib = require('ldt/lib_llist')

I added that same line to the lua_test.lua file and I was able to reproduce the error as it would happen when I ran my test suite. It turns out that when it is time to execute the UDF on the client side, the module is loaded but since ldt/lib_llist is unavailable the require line fails and the script does not load.

So the solution to my case was to separate my stream UDFs and record UDFs and place them into separate modules. I placed the code in lua_test.lua into stream_udfs.lua instead of merging it into my existing module which contains the require line.

One last thing, I find I have to use the following:

local function getRecordKey(rec)
  return list{record.digest(rec)}
end

Because without the list{} it returns an empty bytearray and I get the AS_BYTES log entry in aerospike.log.

In case anyone finds this and wonders what is the point of returning only the record key, this is because (at the time of writing) it is not possible to retrieve LDTs from records in a stream UDF. So I must retrieve the keys I need and then have a second call using Python API llist() to retrieve the LDT from the appropriate bin inside the record. This approach is a bit inefficient as it requires as many roundtrips as there are records. I hope Aerospike implements LDT access in stream UDFs soon.


#5

Okay, so first we’ve established there’s a bug in that you can’t return bytes from your mapper.

Nice find regarding the require. Yes, you definitely want to separate record UDFs from the stream UDFs.


#6

For reference, the internal JIRA number for this bug (for fixing bytes as a valid return type) is AER-4251.


#7

This is in-progress on the server side and will be part of a future 3.6.x release.


#8

@rudeb0t,

Good news! We’ve released Aerospike 3.6.1, which has several new features as well as numerous fixes, including the fix to AER-4251.

You can read more about this release on our Aerospike Server CE 3.6.1 release notes and dowload it here.

Please upgrade and test it without the workaround to see if this has fixed your issue.


#9

Hello @Mnemaudsyne, thanks for the update. Sorry I cannot test it anymore. I have long since rewritten the original code that used this workaround into something that does not require me to retrieve and return the record key in an effort to reduce network roundtrips. But it is good to know that this issue is fixed. Thank you.