I am using query.apply to apply my udf. I see in aerospike log that my udf is registered. But once it starts scanning, i see broken pipe error in aerospike logs.
Jan 26 2017 15:20:20 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered
Jan 26 2017 15:20:20 GMT: INFO (scan): (scan.c:907) starting aggregation scan job 5792803311485597521 {test:users} priority 2
Jan 26 2017 15:20:20 GMT: WARNING (scan): (scan.c:339) send error - fd 71 Broken pipe
Jan 26 2017 15:20:20 GMT: INFO (scan): (scan.c:1018) finished aggregation scan job 5792803311485597521 (-1)
Can you please help me understand on why this is happening
Thanks @pgupta. I can see the script in the clientside with proper permissions. I tried running python examples/client/udf_list.py and this listed the module i registered.
[{‘content’: bytearray(b’‘), ‘type’: 0, ‘hash’: bytearray(b’b1426d8f88b597f40fee048fdb6462dae17fb986’), ‘name’: ‘disabled_filter.lua’}]
Also, I am using node js and did not find any equivalent udf list in the api docs. Hence used python to see if the module is listed.
exports.aggregateUsersByTweetCountByRegion = function(client, callback) {
console.log("********** Aggregation Based on Tweet Count By Region **********");
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
// NOTE: The recommended way of creating indexes in production env is via AQL.
// Create a Secondary Index on tweetcount
createIndexOnTweetcount(client);
// Get min and max tweet counts for range filter
var questions = [
{
type: "input",
name: "min",
message: "Enter Min Tweet Count"
},
{
type: "input",
name: "max",
message: "Enter Max Tweet Count"
}
];
//Note: answers.min and answers.max are string types
inquirer.prompt( questions, function( answers ) {
// Register UDF, if successful, prepare the aggregation query and execute it.
client.udfRegister('udf/aggregationByRegion.lua', function(err1) {
if ( err1 == null ) {
// Prepare query statement - Set range Filter on tweetcount
var statement = {filters:[aerospike.filter.range('tweetcount', parseInt(answers.min), parseInt(answers.max))]};
// Create query
var query = client.query('test', 'users', statement);
//Or you can also use the construct below using 'where' to create the query object:
//var query = client.query('test', 'users');
//query.where(aerospike.filter.range('tweetcount', parseInt(answers.min), parseInt(answers.max)));
// Execute the query, invoking stream Aggregation UDF on the results of the query
// UDF returns aggregated result
query.apply('aggregationByRegion', 'sum', function(err2, result) {
if (err2 == null) {
// Display desired result: "Total Users In <region>: <#>"
console.log('Total Users In East: ', result.e);
console.log('Total Users In West: ', result.w);
console.log('Total Users In North: ', result.n);
console.log('Total Users In South: ', result.s);
callback();
}
else {
console.log('ERROR: Aggregation Based on Tweet Count By Region failed: ',err2);
callback();
}
}); //query.apply()
}
else {
// An error occurred
console.log('ERROR: aggregationByRegion UDF registeration failed: ', err1);
callback();
}
});
});
};
function createIndexOnTweetcount(client) {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
// NOTE: The recommended way of creating indexes in production env is via AQL.
var options = {
ns: 'test',
set: 'users',
bin : 'tweetcount',
index: 'tweetcount_index'
};
client.createIntegerIndex(options, function(err) {
if ( err == null ) {
console.log('INFO: createIndexOnTweetcount created!');
//will show up asynchronously, we can ignore.
} else {
// An error occurred
console.log('ERROR: createIndexOnTweetcount failed:\n', err);
}
});
}
Using UDF in aql or via python gives the result. But on using node client, I am getting Aerospike error
aql> AGGREGATE disabled_filter.active_filter() ON test.users WHERE region = “88955”
±-------------------------+
| active_filter |
±-------------------------+
| MAP(‘{“code”:“88955”}’) |
±-------------------------+
1 row in set (0.003 secs)
client.udfRegister('disabled_filter.lua',function (err) {
if(err == null) {
createIndexOnRegionAndDisabled(function (err) {
if(err == null) {
console.log("All index creation succeeded");
var statement = {filters: [Aerospike.filter.equal('region', '88955')]}
var query = client.query('test', 'users', statement);
query.apply('disabled_filter', 'active_filter', function (err, result) {
if (err == null) {
console.log(result.code);
} else {
console.error("Error occured " + err);
}
});
} else {
console.log("IC failed");
}
});
//create Index on region
} else {
console.log("Register failed");
}
});
function createIndexOnRegionAndDisabled(callback) {
var options = {
ns: 'test',
set: 'users',
bin : 'region',
index: 'region_index',
datatype: Aerospike.indexDataType.STRING
};
client.createIndex(options, function (err) {
if(err == null) {
console.log("Created region index");
options = {
ns: 'test',
set: 'users',
bin : 'disabled',
index: 'disabled_index',
datatype: Aerospike.indexDataType.STRING
};
client.createIndex(options, callback);
} else {
console.log("Region index failed " + err);
}
});
}
Unable to view any related logs other than the below one
Jan 27 2017 05:06:04 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:503) created json object {“content64”: “ZnVuY3Rpb24gYWN0aXZlX2ZpbHRlcihzdHJlYW0pCglsb2NhbCBmdW5jdGlvbiBmaWx0ZXJfYWN0aXZlKHJlYykKCQlyZXR1cm4gcmVjLmRpc2FibGVkID09ICd0cnVlJwoJZW5kCgoJbG9jYWwgZnVuY3Rpb24gbWFwX3JlY29yZChyZWMpCgkgICAgbG9jYWwgcmVzdWx0ID0gbWFwKCkKCSAgICByZXN1bHRbJ2NvZGUnXSA9IHJlY1snY29kZSddCgkgICAgcmV0dXJuIHJlc3VsdAogICAgZW5kCgoJcmV0dXJuIHN0cmVhbSA6IGZpbHRlcihmaWx0ZXJfYWN0aXZlKSA6IG1hcChtYXBfcmVjb3JkKQplbmQK”, “type”: “LUA”, “name”: “disabled_filter.lua”}
Jan 27 2017 05:12:07 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:626) UDF CASK accept fn : n items 1
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:663) pushing to disabled_filter.lua, 285 bytes [function active_filter(stream)
local function filter_active(rec)
return rec.disabled == ‘true’
end
local function map_record(rec)
local result = map()
result['code'] = rec['code']
return result
end
return stream : filter(filter_active) : map(map_record)
end
]
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59
Jan 27 2017 05:12:07 GMT: WARNING (info): (thr_info.c:6530) SINDEX CREATE : Index with the same index defn already exists or bin has already been indexed.
Jan 27 2017 05:12:07 GMT: WARNING (info): (thr_info.c:6530) SINDEX CREATE : Index with the same index defn already exists or bin has already been indexed.
Jan 27 2017 05:12:07 GMT: DEBUG (query): (thr_query.c:2714) No Index Defined in the Query
Jan 27 2017 05:12:07 GMT: DEBUG (query): (thr_query.c:595) Query 0x7f50e6fc3008 Done at base/thr_query.c:2522
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:225) [ENTER] Opening record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:84) [ENTER] Opening record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:712) [ENTER] rec(0x7f50e3c1d040) name(disabled)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:445) [ENTER] BinName(disabled)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:496) [ENTER] urecord(0x7f5105fc5e20) name(0x4104e198)[disabled] dirty(0)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:712) [ENTER] rec(0x7f50e3c1d040) name(code)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:445) [ENTER] BinName(code)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:496) [ENTER] urecord(0x7f5105fc5e20) name(0x4104d1a8)[code] dirty(0)
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:286) [ENTER] Closing record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179
Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:462) [ENTER] NumUpdates(2)
Jan 27 2017 05:12:08 GMT: INFO (info): (thr_info.c:3822) Aerospike Telemetry Agent: Aerospike anonymous data collection is ACTIVE. For further information, see Aerospike Telemetry and Certain Use Data in Community | Aerospike
Jan 27 2017 05:12:08 GMT: INFO (paxos): (paxos.c:3910) Paxos Succession List: bb9e9c494270008
Jan 27 2017 05:12:08 GMT: DEBUG (udf): (udf_cask.c:253) UDF CASK INFO LIST
Jan 27 2017 05:12:08 GMT: DEBUG (udf): (udf_cask.c:220) UDF metadata item[0]: module “UDF” ; key “disabled_filter.lua” ; value “{“content64”: “ZnVuY3Rpb24gYWN0aXZlX2ZpbHRlcihzdHJlYW0pCglsb2NhbCBmdW5jdGlvbiBmaWx0ZXJfYWN0aXZlKHJlYykKCQlyZXR1cm4gcmVjLmRpc2FibGVkID09ICd0cnVlJwoJZW5kCgoJbG9jYWwgZnVuY3Rpb24gbWFwX3JlY29yZChyZWMpCgkgICAgbG9jYWwgcmVzdWx0ID0gbWFwKCkKCSAgICByZXN1bHRbJ2NvZGUnXSA9IHJlY1snY29kZSddCgkgICAgcmV0dXJuIHJlc3VsdAogICAgZW5kCgoJcmV0dXJuIHN0cmVhbSA6IGZpbHRlcihmaWx0ZXJfYWN0aXZlKSA6IG1hcChtYXBfcmVjb3JkKQplbmQK”, “type”: “LUA”, “name”: “disabled_filter.lua”}” ; generation 102 ; timestamp 18871736
BTW, above is your lua code in base64. Cut and paste in base64decode.org and you will see your code in ascii text!
I think in your filters: you have Aerospike instead of aerospike (Check case sensitivity)
Also, create your index once via AQL instead of doing it in your application again and again. I had posted the code just to show how you can do via the application.
I have created the aerospike var as const Aerospike = require(‘aerospike’). Hence i dont think case sensitivity is an issue. Like you said i have commented out the index creation part. Still it is not working, I am seeing the same error is thrown.
check your execution path. Are you reaching your filters: statement after refactoring code for removing index creation? You just have to put some console.log statements and debug.
the aggregation framework in node.js does work. what are you dependencies in package.json?
Above are my dependencies. Yes I see that the udf registration is passing and my statements start getting executed both in the client and server side. After udf is registered on the server side, i dont see errors in aerospike logs in server side as well. Here are the client side logs
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [client.cc:106] [New] - Aerospike client initialized successfully
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [cluster.cc:49] [Connect] - Connecting to Cluster: Success
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:248] [execute] - Invoking aerospike udf register
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:271] [respond] - UDF register operation : response is
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query.cc:65] [setup_query] - Building filter on predicate type 0, index type 0, data type 0, bin name ‘region’
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query.cc:113] [setup_query] - String equality predicate 88955
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:292] [respond] - Invoked Put callback
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:312] [respond] - Cleaned up all the structures
Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query_apply.cc:88] [execute] - Sending query command with stream UDF
Jan 27 2017 06:48:54 UTC: INFO (16343) [query_apply.cc:103] [respond] - Command failed: 100 UDF: Execution Error 1
systemPath is where all your aerospike lua modules are installed when you installed the node.js aerospike client on your client machine, typical /usr/local/aerospike/lua. i suspect you may have wrong systemPath entry.
I am using client.execute function to run my udf
and I am registering udf file by
RegisterTask task=client.register(new Policy(), “udf/example.lua”, “example.lua”, Language.LUA);
But i am getting error
java.io.FileNotFoundException: udf/example.lua (No such file or directory)
can someone suggest me
client = await Aerospike.connect(config);
let register = await client.udfRegister(p)
var query = client.query("test", "demo")
query.apply("group_by", "order_by", [3]);
ERROR(22048) [mod_lua.c:577] [create_state] - Lua Create Error: module ‘group_by’ not found:
no field package.preload[‘group_by’]
no file ‘.\group_by.lua’
no file ‘C:\Program Files\nodejs64\lua\group_by.lua’
I am new to aerospike, can you help what I am doing wrong