Stream UDF is not working -- lua functions are not even called

aggregation
udf
query

#1

Following is my python code which is calling lua for aggregation , I dont think inner functions are called

from __future__ import print_function
import aerospike
from aerospike import GeoJSON
from aerospike import predicates as p

config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()

query = client.query('supply', 'outlet')
query.where(p.geo_within_radius('loc', 77.597105, 12.887629, 2000))
query.apply('apply_filter_for_python', 'apply_filter', [])
records = query.results()
client.close()

following is my lua script

local function select_value(rec)
    print("222222222222")
  return rec.val
end

function apply_filter(stream, amen)
    print("00000000000000000")
  local function match_amenity(rec)
      print("1111111111111111")
      print(rec.kw)
    return rec
  end
  print("0000000000000000")
  s = stream : filter(match_amenity) : map(select_value)
  print("999999999999999")
  return s
end

I see the following output, if u see I dont even see “111111111111…” printed

00000000000000000 0000000000000000 999999999999999

Even aql gave the same result

aql> aggregate apply_filter_for_python.apply_filter() on supply.outlet where merchant_code="mc-o0166948" 00000000000000000 0000000000000000 999999999999999

I am new to lua and aerospike , not sure where the mistake is. Appreciate if anybody can help

Thanks and Regards Raaghu.K


#2

Little more update. I have changed the lua script , at least methods are getting called now. From the debug/print statements I see the stream size is 1 but the stream value is nil , hence filter method is never called

modified lua

local function printTable(list, i)
    print("88888888888")
    local listString = ''
--~ begin of the list so write the {
    if not i then
        listString = listString .. '{'
    end

    i = i or 1
    local element = list[i]
    print(element)
--~ it may be the end of the list
    if not element then
        return listString .. '}'
    end
--~ if the element is a list too call it recursively
    if(type(element) == 'table') then
        listString = listString .. printTable(element)
    else
        listString = listString .. element
    end

    return listString .. ', ' .. printTable(list, i + 1)

end

local function tablelength(T)
  local count = 0
  for _ in pairs(T) do count = count + 1 end
  return count
end

  local function match_amenity(rec)
      print("match_amenity")
    return rec
  end

function apply_filter(stream)
    print(stream)
    print(tablelength(stream))
    print(printTable(stream))
    print("00000000000000000")
  s = stream : filter(match_amenity)
  print("999999999999999")
  return s
end

Stream cannot be null, when I execute simple select command , I see 979 records aql> select * from supply.outlet where CONDITION 979 rows in set (0.193 secs)

When I aggregate with the same predicate, script is executed with stream as nil

aql> aggregate apply_filter_for_python.apply_filter() ON supply.outlet where CONDITION
table: 0x7f5db80150e0
1
88888888888
**nil**
{}
00000000000000000
999999999999999
0 rows in set (0.008 secs)

Not sure how stream is nil when we call aggregate.

Thanks and Regards Raaghu.K


#3

Looking at your revised udf:

First, filter function is supposed to return true or false, based on some criteria applied to the bins in the record. You are returning the record itself. If it returns true, that record is passed to next stage, false, that record is dropped or filtered out.

Second, stream udfs cannot return back a stream of “records”. At the very least you have to map them into a map object and return the map object.

So it is not clear what you are trying to do. Put a few sample records from your data, your problem statement and it may make more sense.


#4

You are right, In the mean time I had figured out. I could able to return maps after implementing map function

Thanks and Regards Raaghu.K