How do I debug the 'reduce()' operation of a stream UDF?

Hi,

I suspect that the reduce operation of my stream UDF is not working as I intend.

Where are the logs of this operation generated? I also know that the reduce operation is executed on the client. Do I have to write some code to log the messages of the reduce function?

I can see the logs of the mapper and the aggregate functions in the aerospike log but not the reduce logs.

With Best Regards, Himanshu

I’m facing the same issue. How can we see logs generated in the “reduce” stream udf?

Yes, it is possible. But you have to carefully understand each step. I will post a python client example. This is a long post – so bear with me!

Generally reduce() function is a simple merge function and should not require debugging. But if you must … then here you go.

1 - Reduce() runs both on sever nodes and client nodes. 2 - Reduce() runs on server nodes only if it needs to. If you are already reduced to a single value in aggregation(), there is no need to run reduce() on the server node, Aerospike will not run it. 3 - So typically If you have a multi-node cluster, now each server will send its aggregation to client and reduce() will run on the client. => If you are testing with a single node server, quite likely reduce() will not be called - not needed. In that case, you won’t see your debug() printout regardless.

There are two lua module directories - one for aerospike lua modules that come with the server installation on the server node, and a slightly different set that come with the client installation on the client node - though their names may be same. (I am advised to not copy server lua modules to client side.) Second, is for user written content. usr lua modules.

When you register your lua module with AQL, it only goes on the server nodes. The client side copy is not installed. So, for stream udfs, I suggest always use the client’s API to register stream udf modules. Note, if you modify your lua module, eg add debug() statements, while the api updates the lua modules on the server it does not update the lua module on the client side unless you first go in the client side directory and delete your user lua file, then it will write in a new one in the client side directory – that is what I found in my testing.

In python, it is udf_put(). When I open the connection to the server in python, in my config dictionary, I pass the client side user lua directory path and system lua file paths so my connection knows whats where on client side.

Here is a code snippet to open my python client connection to the server:

> config = {'hosts': [(host,port)],
>                   'lua': {'system_path':'/usr/local/aerospike/lua/',
>                           'user_path':'/usr/local/aerospike/usr-lua/'
>                          }
>                  }
>    
>             self.client = aerospike.client(config).connect()

On the sever side, the lua modules are at /opt/aerospike/sys/udf/lua:

drwxr-xr-x 4 aerospike aerospike 4096 Feb 7 22:28 . drwxr-xr-x 3 aerospike aerospike 4096 Aug 11 12:21 … -rw-r–r-- 1 aerospike aerospike 5623 Jul 31 2016 aerospike.lua -rw-r–r-- 1 aerospike aerospike 3211 Jul 31 2016 as.lua drwxr-xr-x 2 aerospike aerospike 4096 Aug 11 12:21 external drwxr-xr-x 2 aerospike aerospike 4096 Feb 7 21:35 ldt -rw-r–r-- 1 aerospike aerospike 8990 Feb 7 22:23 stream_ops.lua pgupta@ubuntu:/opt/aerospike/sys/udf/lua

and your stream udfs will go on the server at: /opt/aerospike/usr/udf/lua

So, first, we have to keep track of where stream udfs and aerospike lua modules reside on a aerospike server vs an aerospike client.

(I tested all this running inside a single VM with 3 aerospike process running on different ports, forming a 3 node cluster and the client also running in the same VM.)

Next, debugging level in the logging stanza of your config file - should be atleast so, order is important. (if you do any info after aggr debug, aggr will change to info and then you won’t see your debug printiouts in the log):

> logging {
>         file /var/log/aerospike/a1.log {
> 		context any info
>                 context udf debug
>                 context aggr debug
> 	}
> }

So this takes care of the server side where you have log files. On the client side, you don’t have a log file. So, in your client code, turn on console logging. In Python, I added:

> def as_logger(level, func, myfile, line, msg):
>     print("**", myfile, line, func, '::', msg, "**")

> aerospike.set_log_level(aerospike.LOG_LEVEL_DEBUG)
> aerospike.set_log_handler(as_logger)

Now, in my user lua module I added the debug() call for printing whatever I want.

Here is something else you can do (totally optional) to see if your reduce() function is actually getting called. Both on server and client side, in aerospike lua modules, you have a file called stream_ops.lua which has a reduce() defined. Add debug statements in both files, if you want. f(a,b) is call to your function. I can track if it gets called with my debug() printouts. See below: (on server side file I added: In reduce streamops 1/2/3, on client side file I added: In reduce client streamops 1/2/3)

> function reduce( next, f )

>     -- done indicates if we exhausted the `next` stream
>     local done = false

>     -- return a closure which the caller can use to get the next value
>     return function()


>         -- we bail if we already exhausted the stream
>         if done then return nil end

>         -- get the first value
>         local a = next()
>         debug("In reduce streamops 1")
>         if a ~= nil then
>             -- get each subsequent value and reduce them
>             debug("In reduce streamops 2")
>             for b in next do
>                 debug("In reduce streamops 3")
>                 a = f(a,b)
>             end
>         end

>         -- we are done!
>         done = true

>         return a
>     end
> end

The server side debug printouts will show up in the logs: > $ cat -v /var/log/aerospike/a2.log |grep “In reduce” > debug(“In reduce call”) > debug(“In reduce call”) > debug(“In reduce call”) > debug(“In reduce call”) <== These are from server reading my lua source udf. > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 1 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 2 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 1 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 2 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 1 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 2 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 1 > Feb 08 2017 07:14:30 GMT: DEBUG (aggr): (/opt/aerospike/sys/udf/lua/aerospike.lua:192) In reduce streamops 2

On server side, I never reach streamops 3 - ie my reduce() is not called or needed on server node. The client side debug printout shows up in the console:

Numeric Secondary Index on tweetcount Created ** src/main/mod_lua.c 610 create_state :: Size of the lua state created for the file aggregationByRegion in KB 87 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:35 - In sum call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:140 - In reduce client streamops 1 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:143 - In reduce client streamops 2 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/lua//stream_ops.lua:145 - In reduce client streamops 3 ** ** src/main/aerospike/aerospike_query.c 113 as_query_aerospike_log :: /usr/local/aerospike/usr-lua//aggregationByRegion.lua:23 - In reduce call **

Don’t know if this is worth the effort to debug reduce() – but since you asked!!!

If you are using java,you can use this code:

package life.arganzheng.study.aerospike;

import java.util.Date;

import com.aerospike.client.Log;

public class MyLogCallback implements Log.Callback {

    public MyLogCallback() {
        Log.setLevel(Log.Level.INFO);
        Log.setCallback(this);
    }

    @Override
    public void log(Log.Level level, String message) {
        Date date = new Date();
        System.out.println(date.toString() + ' ' + level + ' ' + message);
    }
}

and register this callback in somewhere:

public class BdgAerospikeClient implements Closeable {
 
    private void initAerospikeClient() {
        // init Log Callback for Aerospike client logging
        // @see http://www.aerospike.com/docs/client/java/usage/logging.html
        Log.Callback mycallback = new MyLogCallback();
        Log.setCallback(mycallback);
        Log.setLevel(Log.Level.DEBUG);

   
        ClientPolicy policy = new ClientPolicy();
        policy.timeout = Constants.DEFAULT_TIMEOUT_MS;

        List<Host> hosts = new ArrayList<Host>();
        hosts.add(new Host("xxxxx", Constants.AEROSPIKE_DEFAULT_PORT));
        client = new AerospikeClient(policy, hosts.toArray(new Host[0]));
    }
}
1 Like