AerospikeError: UDF: Execution Error 1


#1

I am using query.apply to apply my udf. I see in aerospike log that my udf is registered. But once it starts scanning, i see broken pipe error in aerospike logs.

Jan 26 2017 15:20:20 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered Jan 26 2017 15:20:20 GMT: INFO (scan): (scan.c:907) starting aggregation scan job 5792803311485597521 {test:users} priority 2 Jan 26 2017 15:20:20 GMT: WARNING (scan): (scan.c:339) send error - fd 71 Broken pipe Jan 26 2017 15:20:20 GMT: INFO (scan): (scan.c:1018) finished aggregation scan job 5792803311485597521 (-1)

Can you please help me understand on why this is happening


Moving from NodeJs from 0.12 to 8.9.3 - UDF issue
#2

Please see Another UDF Error 100 debugging


#3

Thanks @pgupta. I can see the script in the clientside with proper permissions. I tried running python examples/client/udf_list.py and this listed the module i registered. [{‘content’: bytearray(b’’), ‘type’: 0, ‘hash’: bytearray(b’b1426d8f88b597f40fee048fdb6462dae17fb986’), ‘name’: ‘disabled_filter.lua’}] Also, I am using node js and did not find any equivalent udf list in the api docs. Hence used python to see if the module is listed.


#4

Here is a node.js aggregation example:

exports.aggregateUsersByTweetCountByRegion = function(client, callback)  {

  console.log("********** Aggregation Based on Tweet Count By Region **********");

    // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
    // NOTE: The recommended way of creating indexes in production env is via AQL.

    // Create a Secondary Index on tweetcount
   
    createIndexOnTweetcount(client);

    // Get min and max tweet counts for range filter
    var questions = [
      {
        type: "input",
        name: "min",
        message: "Enter Min Tweet Count"
      },
      {
        type: "input",
        name: "max",
        message: "Enter Max Tweet Count"
      }
    ];

    //Note: answers.min and answers.max are string types
    inquirer.prompt( questions, function( answers ) {

      
      // Register UDF, if successful, prepare the aggregation query and execute it.
 
      client.udfRegister('udf/aggregationByRegion.lua', function(err1) {
        if ( err1 == null ) {
          // Prepare query statement - Set range Filter on tweetcount
  
          var statement = {filters:[aerospike.filter.range('tweetcount', parseInt(answers.min), parseInt(answers.max))]};

          // Create query
 
          var query = client.query('test', 'users', statement);

          //Or you can also use the construct below using 'where' to create the query object:
          //var query = client.query('test', 'users');
          //query.where(aerospike.filter.range('tweetcount', parseInt(answers.min), parseInt(answers.max)));

          // Execute the query, invoking stream Aggregation UDF on the results of the query
          // UDF returns aggregated result
       
          query.apply('aggregationByRegion', 'sum', function(err2, result)  {
            if (err2 == null) {
              // Display desired result: "Total Users In <region>: <#>"
              
              console.log('Total Users In East:  ', result.e);
              console.log('Total Users In West:  ', result.w);
              console.log('Total Users In North: ', result.n);
              console.log('Total Users In South: ', result.s);
              callback();
            }
            else {
              console.log('ERROR: Aggregation Based on Tweet Count By Region failed: ',err2);
              callback();
            }
          });  //query.apply()
        }
        else {
          // An error occurred
          console.log('ERROR: aggregationByRegion UDF registeration failed: ', err1);
          callback();
        }
      });
    });
};
function createIndexOnTweetcount(client)  {
  // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
  // NOTE: The recommended way of creating indexes in production env is via AQL.
  var options = {
    ns:  'test',
    set: 'users',
    bin : 'tweetcount',
    index: 'tweetcount_index'
  };
      client.createIntegerIndex(options, function(err)  {
        if ( err == null ) {
          console.log('INFO: createIndexOnTweetcount created!');
          //will show up asynchronously, we can ignore.
        } else {
          // An error occurred
          console.log('ERROR: createIndexOnTweetcount failed:\n', err);
        }
      });
    }

#5

Thanks @pgupta. I was able to successfully get the results with python. Will try out the node js code and will update.


#6

Using UDF in aql or via python gives the result. But on using node client, I am getting Aerospike error

aql> AGGREGATE disabled_filter.active_filter() ON test.users WHERE region = "88955" ±-------------------------+ | active_filter | ±-------------------------+ | MAP(’{“code”:“88955”}’) | ±-------------------------+ 1 row in set (0.003 secs)

client.udfRegister('disabled_filter.lua',function (err) {
    if(err == null) {
    createIndexOnRegionAndDisabled(function (err) {
        if(err == null) {
            console.log("All index creation succeeded");
            var statement = {filters: [Aerospike.filter.equal('region', '88955')]}
            var query = client.query('test', 'users', statement);
            query.apply('disabled_filter', 'active_filter', function (err, result) {
                if (err == null) {
                    console.log(result.code);
                } else {
                    console.error("Error occured " + err);
                }
            });
        } else {
            console.log("IC failed");
        }
    });
        //create Index on region
    } else {
        console.log("Register failed");
    }
});

function createIndexOnRegionAndDisabled(callback) {
    var options = {
        ns:  'test',
        set: 'users',
        bin : 'region',
        index: 'region_index',
        datatype: Aerospike.indexDataType.STRING
    };
    client.createIndex(options, function (err) {
        if(err == null) {
            console.log("Created region index");
            options = {
                ns:  'test',
                set: 'users',
                bin : 'disabled',
                index: 'disabled_index',
                datatype: Aerospike.indexDataType.STRING
            };
            client.createIndex(options, callback);
        } else {
            console.log("Region index failed " + err);
        }
    });
}

Unable to view any related logs other than the below one Jan 27 2017 05:06:04 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered


#7

Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:503) created json object {“content64”: “ZnVuY3Rpb24gYWN0aXZlX2ZpbHRlcihzdHJlYW0pCglsb2NhbCBmdW5jdGlvbiBmaWx0ZXJfYWN0aXZlKHJlYykKCQlyZXR1cm4gcmVjLmRpc2FibGVkID09ICd0cnVlJwoJZW5kCgoJbG9jYWwgZnVuY3Rpb24gbWFwX3JlY29yZChyZWMpCgkgICAgbG9jYWwgcmVzdWx0ID0gbWFwKCkKCSAgICByZXN1bHRbJ2NvZGUnXSA9IHJlY1snY29kZSddCgkgICAgcmV0dXJuIHJlc3VsdAogICAgZW5kCgoJcmV0dXJuIHN0cmVhbSA6IGZpbHRlcihmaWx0ZXJfYWN0aXZlKSA6IG1hcChtYXBfcmVjb3JkKQplbmQK”, “type”: “LUA”, “name”: “disabled_filter.lua”} Jan 27 2017 05:12:07 GMT: INFO (udf): (udf_cask.c:514) UDF module ‘disabled_filter.lua’ (/opt/aerospike/usr/udf/lua/disabled_filter.lua) registered Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:626) UDF CASK accept fn : n items 1 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_cask.c:663) pushing to disabled_filter.lua, 285 bytes [function active_filter(stream) local function filter_active(rec) return rec.disabled == 'true’ end

local function map_record(rec)
    local result = map()
    result['code'] = rec['code']
    return result
end

return stream : filter(filter_active) : map(map_record)

end ] Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (src/main/mod_lua.c:604) Size of the lua state created for the file disabled_filter in KB 59 Jan 27 2017 05:12:07 GMT: WARNING (info): (thr_info.c:6530) SINDEX CREATE : Index with the same index defn already exists or bin has already been indexed. Jan 27 2017 05:12:07 GMT: WARNING (info): (thr_info.c:6530) SINDEX CREATE : Index with the same index defn already exists or bin has already been indexed. Jan 27 2017 05:12:07 GMT: DEBUG (query): (thr_query.c:2714) No Index Defined in the Query Jan 27 2017 05:12:07 GMT: DEBUG (query): (thr_query.c:595) Query 0x7f50e6fc3008 Done at base/thr_query.c:2522 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:225) [ENTER] Opening record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:84) [ENTER] Opening record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:712) [ENTER] rec(0x7f50e3c1d040) name(disabled) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:445) [ENTER] BinName(disabled) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:496) [ENTER] urecord(0x7f5105fc5e20) name(0x4104e198)[disabled] dirty(0) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:712) [ENTER] rec(0x7f50e3c1d040) name(code) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:445) [ENTER] BinName(code) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:496) [ENTER] urecord(0x7f5105fc5e20) name(0x4104d1a8)[code] dirty(0) Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:286) [ENTER] Closing record key::0xd59fb3c1ad12c6166a3fcfe6cdc414e6d9830179 Jan 27 2017 05:12:07 GMT: DEBUG (udf): (udf_record.c:462) [ENTER] NumUpdates(2) Jan 27 2017 05:12:08 GMT: INFO (info): (thr_info.c:3822) Aerospike Telemetry Agent: Aerospike anonymous data collection is ACTIVE. For further information, see http://aerospike.com/aerospike-telemetry Jan 27 2017 05:12:08 GMT: INFO (paxos): (paxos.c:3910) Paxos Succession List: bb9e9c494270008 Jan 27 2017 05:12:08 GMT: DEBUG (udf): (udf_cask.c:253) UDF CASK INFO LIST Jan 27 2017 05:12:08 GMT: DEBUG (udf): (udf_cask.c:220) UDF metadata item[0]: module “UDF” ; key “disabled_filter.lua” ; value “{“content64”: “ZnVuY3Rpb24gYWN0aXZlX2ZpbHRlcihzdHJlYW0pCglsb2NhbCBmdW5jdGlvbiBmaWx0ZXJfYWN0aXZlKHJlYykKCQlyZXR1cm4gcmVjLmRpc2FibGVkID09ICd0cnVlJwoJZW5kCgoJbG9jYWwgZnVuY3Rpb24gbWFwX3JlY29yZChyZWMpCgkgICAgbG9jYWwgcmVzdWx0ID0gbWFwKCkKCSAgICByZXN1bHRbJ2NvZGUnXSA9IHJlY1snY29kZSddCgkgICAgcmV0dXJuIHJlc3VsdAogICAgZW5kCgoJcmV0dXJuIHN0cmVhbSA6IGZpbHRlcihmaWx0ZXJfYWN0aXZlKSA6IG1hcChtYXBfcmVjb3JkKQplbmQK”, “type”: “LUA”, “name”: “disabled_filter.lua”}” ; generation 102 ; timestamp 18871736


#8

BTW, above is your lua code in base64. Cut and paste in base64decode.org and you will see your code in ascii text!

I think in your filters: you have Aerospike instead of aerospike (Check case sensitivity)

Also, create your index once via AQL instead of doing it in your application again and again. I had posted the code just to show how you can do via the application.


#9

I have created the aerospike var as const Aerospike = require(‘aerospike’). Hence i dont think case sensitivity is an issue. Like you said i have commented out the index creation part. Still it is not working, I am seeing the same error is thrown.


#10

check your execution path. Are you reaching your filters: statement after refactoring code for removing index creation? You just have to put some console.log statements and debug.

the aggregation framework in node.js does work. what are you dependencies in package.json?


#11

“dependencies”: { “aerospike”: “^2.4.4”, “lodash”: “^4.17.4” }

Above are my dependencies. Yes I see that the udf registration is passing and my statements start getting executed both in the client and server side. After udf is registered on the server side, i dont see errors in aerospike logs in server side as well. Here are the client side logs

Jan 27 2017 06:48:54 UTC: DEBUG(16343) [client.cc:106] [New] - Aerospike client initialized successfully Jan 27 2017 06:48:54 UTC: DEBUG(16343) [cluster.cc:49] [Connect] - Connecting to Cluster: Success Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:248] [execute] - Invoking aerospike udf register Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:271] [respond] - UDF register operation : response is Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query.cc:65] [setup_query] - Building filter on predicate type 0, index type 0, data type 0, bin name 'region’ Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query.cc:113] [setup_query] - String equality predicate 88955 Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:292] [respond] - Invoked Put callback Jan 27 2017 06:48:54 UTC: DEBUG(16343) [udf_register.cc:312] [respond] - Cleaned up all the structures Jan 27 2017 06:48:54 UTC: DEBUG(16343) [query_apply.cc:88] [execute] - Sending query command with stream UDF Jan 27 2017 06:48:54 UTC: INFO (16343) [query_apply.cc:103] [respond] - Command failed: 100 UDF: Execution Error 1


#12

Make sure in your connection, you specify user lua path in userPath.

var client = aerospike.client({ hosts: [ { addr: hostaddr, port: 3000 }], modlua:{systemPath:’/usr/local/aerospike/lua’, userPath:’/usr/local/aerospike/usr-lua’ } }).connect( function(response) { …


#13

I have my lua udf in /Users/divya/repos/aerospike-poc/lua/disabled_filter.lua and I am using the below config

modlua: {
    userPath: '/Users/divya/repos/aerospike-poc/lua/',
    systemPath: '/Users/divya/repos/aerospike-poc'
}

During register, I use client.udfRegister(‘lua/disabled_filter.lua’,function (err) {

Also, What is the use of systemPath in config.


#14

systemPath is where all your aerospike lua modules are installed when you installed the node.js aerospike client on your client machine, typical /usr/local/aerospike/lua. i suspect you may have wrong systemPath entry.


#15

Thanks a ton @pgupta. Atlast it worked. The problem was with the systemPath.


#16

Hello Guys,

Can somebody help us with this issue as well? It is

Truly appreciate.

Regards, Manik