Following is my python code which is calling lua for aggregation , I dont think inner functions are called
from __future__ import print_function
import aerospike
from aerospike import GeoJSON
from aerospike import predicates as p
config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()
query = client.query('supply', 'outlet')
query.where(p.geo_within_radius('loc', 77.597105, 12.887629, 2000))
query.apply('apply_filter_for_python', 'apply_filter', [])
records = query.results()
client.close()
following is my lua script
local function select_value(rec)
print("222222222222")
return rec.val
end
function apply_filter(stream, amen)
print("00000000000000000")
local function match_amenity(rec)
print("1111111111111111")
print(rec.kw)
return rec
end
print("0000000000000000")
s = stream : filter(match_amenity) : map(select_value)
print("999999999999999")
return s
end
I see the following output, if u see I dont even see “111111111111…” printed
aql> aggregate apply_filter_for_python.apply_filter() on supply.outlet where merchant_code=“mc-o0166948”
00000000000000000
0000000000000000
999999999999999
I am new to lua and aerospike , not sure where the mistake is. Appreciate if anybody can help
Little more update. I have changed the lua script , at least methods are getting called now. From the debug/print statements I see the stream size is 1 but the stream value is nil , hence filter method is never called
modified lua
local function printTable(list, i)
print("88888888888")
local listString = ''
--~ begin of the list so write the {
if not i then
listString = listString .. '{'
end
i = i or 1
local element = list[i]
print(element)
--~ it may be the end of the list
if not element then
return listString .. '}'
end
--~ if the element is a list too call it recursively
if(type(element) == 'table') then
listString = listString .. printTable(element)
else
listString = listString .. element
end
return listString .. ', ' .. printTable(list, i + 1)
end
local function tablelength(T)
local count = 0
for _ in pairs(T) do count = count + 1 end
return count
end
local function match_amenity(rec)
print("match_amenity")
return rec
end
function apply_filter(stream)
print(stream)
print(tablelength(stream))
print(printTable(stream))
print("00000000000000000")
s = stream : filter(match_amenity)
print("999999999999999")
return s
end
Stream cannot be null, when I execute simple select command , I see 979 records
aql> select * from supply.outlet where CONDITION
979 rows in set (0.193 secs)
When I aggregate with the same predicate, script is executed with stream as nil
aql> aggregate apply_filter_for_python.apply_filter() ON supply.outlet where CONDITION
table: 0x7f5db80150e0
1
88888888888
**nil**
{}
00000000000000000
999999999999999
0 rows in set (0.008 secs)
Not sure how stream is nil when we call aggregate.
First, filter function is supposed to return true or false, based on some criteria applied to the bins in the record. You are returning the record itself. If it returns true, that record is passed to next stage, false, that record is dropped or filtered out.
Second, stream udfs cannot return back a stream of “records”. At the very least you have to map them into a map object and return the map object.
So it is not clear what you are trying to do. Put a few sample records from your data, your problem statement and it may make more sense.