local function map_profile(record)
-- Add user and password to returned map.
-- Could add other record bins here as well.
return map {tenantId=record.tenantId, txnid=record.txnid}
end
function profile_filter(stream,tenantId,glid)
local function filter_tenantId(record)
return record.tenantId == tenantId and record.glid==glid
end
return stream : filter(filter_tenantId) : map(map_profile)
end
no field package.preload['ConversioLogs']
no file './ConversioLogs.lua'
no file '/usr/local/share/lua/5.1/ConversioLogs.lua'
no file '/usr/local/share/lua/5.1/ConversioLogs/init.lua'
no file '/usr/local/lib/lua/5.1/ConversioLogs.lua'
no file '/usr/local/lib/lua/5.1/ConversioLogs/init.lua'
no file '/opt/aerospike/sys/udf/lua/ConversioLogs.lua'
no file '/opt/aerospike/sys/udf/lua/external/ConversioLogs.lua'
no file '/opt/aerospike/usr/udf/lua/ConversioLogs.lua'
no file './ConversioLogs.so'
no file '/usr/local/lib/lua/5.1/ConversioLogs.so'
no file '/usr/local/lib/lua/5.1/loadall.so'
no file '/opt/aerospike/sys/udf/lua/ConversioLogs.so'
no file '/opt/aerospike/sys/udf/lua/external/ConversioLogs.so'
no file '/opt/aerospike/usr/udf/lua/ConversioLogs.so
local function map_profile(record)
-- Add user and password to returned map.
-- Could add other record bins here as well.
return map {tenantId=record.tenantId, txnid=record.txnid}
end
function profile_filter(stream,oid,afid,ip)
local function filter_tenantId(record)
return record.oid == oid and record.afid == afid
end
return stream : filter(filter_tenantId) : map(map_profile)
end
This how i am calling it from java client
public List<Map<String, Object>> getMultiByLua(String table, Map<String, Object> queryParams)
throws AerospikeException {
QueryPolicy policy = new QueryPolicy();
Statement statement = new Statement();
statement.setNamespace(namespace);
statement.setSetName(table);
if (queryParams != null && queryParams.size() > 0) {
// List<Filter> filters = new ArrayList<>();
// for (Entry<String, Object> en : queryParams.entrySet()) {
// Filter filter = Filter.equal(en.getKey(), Value.get(en.getValue()).toString());
// filters.add(filter);
// }
statement.setFilters(Filter.equal("tenantId", (String) queryParams.get("tenantId")));
}
statement.setAggregateFunction("Clicks", "profile_filter", Value.get(queryParams.get("oid")),
Value.get(queryParams.get("afid")));
// passFilter will be applied in filter_example.lua.
ResultSet recordSet = client.queryAggregate(null, statement);
System.out.println("*******************************");
System.out.println( recordSet.toString());
System.out.println("*******************************");
if (recordSet != null) {
List<Map<String, Object>> records = new ArrayList<Map<String, Object>>();
while (recordSet.next()) {
Map<String, Object> map = (Map<String, Object>) recordSet.getObject();
// Record record = (Record) map;
// DBRecord dbRecord = DBRecord.fromRecord(record);
records.add(map);
}
return records;
}
return null;
1 - In profile_filter, what is the “ip” - last argument? You are not passing it either in your call. If you don’t need it, I would take it out.
2 - In your input stream of records, do you actually have records that match the filtering criteria? oid && afid?
If it is working in AQL, it is not a StreamUDF issue. Has to be something in your Java implementation. You are saying there is an additional filter, but your java is not using it… so what you are posting is probably not your real code. Step through your code and you should be able to figure it out.