Failed to find 'removeRecord' in 'udf\?.lua' using QueryAggregate


#1

Hi,

I have this error trying to use {AerospikeClient}.QueryAggregate:
Error Code -1: Query Failed: Error Code 4: Failed to find ‘removeRecord’ in 'udf?.lua’

I already checked the Neo.Lua package version: 0.9.14.0. Aerospike client version 3.4.5, (but I also tried the version 3.6.6)

This is the code:

    public const string CountRecordsFileName = "countRecords"; 
    public const string CountRecordsFunctionName = "count_records";  

    var countRecordsFunction =
    $@"function {CountRecordsFunctionName}(r)
        return 100
    end";

    Client.RegisterUdfString(null, countRecordsFunction, CountRecordsFileName + ".lua", Language.LUA); // this is executed asynchronously  

(I also tried using .Wait())

        var statement = new Statement
        {
            Namespace = configuration.Namespace,
            SetName = "MySet",
            Filter = Filter.Contains("Id", IndexCollectionType.DEFAULT, "AAAAAAAAAAAAAAAAAAAAAAAAA")
        };

        using (var result = Client.QueryAggregate(null, statement, CountRecordsFileName, CountRecordsFunctionName))
        {
            result.Next();
            var data = result.Object;
            return (long)data;
        }
  1. Client.QueryAggregate result ALWAYS in a error, also when I use a valid package/function that works (when called with .Execute())
  2. Whatever I use for packageName (I mean a wrong name) in .Execute() I NEVER have an error.

Any Idea for the different behavior of .Execute and QueryAggregator ?

Thanks,

Alex


#2

Aggregation queries currently support reading lua code from a file or from a C# resource, but not from a string. The next C# client release will support reading lua code from a string.

The new client will demonstrate how to run an aggregation query with a lua code string in AerospikeDemo/QuerySum.cs. Here is a preview of that code:

using System.IO;
using Aerospike.Client;

namespace Aerospike.Demo
{
	public class QuerySum : SyncExample
	{
		public QuerySum(Console console)
			: base(console)
		{
		}

		/// <summary>
		/// Create secondary index and query on it and apply aggregation user defined function.
		/// </summary>
		public override void RunExample(AerospikeClient client, Arguments args)
		{
			if (!args.hasUdf)
			{
				console.Info("Query functions are not supported by the connected Aerospike server.");
				return;
			}
			
			string packageContents = @"
local function reducer(val1,val2)
	return val1 + val2
end

function sum_single_bin(stream,name)
	local function mapper(rec)
		return rec[name]
	end
	return stream : map(mapper) : reduce(reducer)
end
";
			string indexName = "aggindex";
			string keyPrefix = "aggkey";
			string binName = args.GetBinName("aggbin");
			int size = 10;

			Register(client, args, packageContents);
			CreateIndex(client, args, indexName, binName);
			WriteRecords(client, args, keyPrefix, binName, size);
			RunQuery(client, args, indexName, binName, packageContents);
			client.DropIndex(args.policy, args.ns, args.set, indexName);
		}

		private void Register(AerospikeClient client, Arguments args, string packageContents)
		{
			string packageName = "sum_example.lua";
			console.Info("Register: " + packageName);
			RegisterTask task = client.RegisterUdfString(null, packageContents, packageName, Language.LUA);
			task.Wait();
		}

		private void CreateIndex(AerospikeClient client, Arguments args, string indexName, string binName)
		{
			console.Info("Create index: ns={0} set={1} index={2} bin={3}",
				args.ns, args.set, indexName, binName);

			Policy policy = new Policy();
			policy.totalTimeout = 0; // Do not timeout on index create.

			try
			{
				IndexTask task = client.CreateIndex(policy, args.ns, args.set, indexName, binName, IndexType.NUMERIC);
				task.Wait();
			}
			catch (AerospikeException ae)
			{
				if (ae.Result != ResultCode.INDEX_ALREADY_EXISTS)
				{
					throw;
				}
			}
		}

		private void WriteRecords(AerospikeClient client, Arguments args, string keyPrefix, string binName, int size)
		{
			for (int i = 1; i <= size; i++)
			{
				Key key = new Key(args.ns, args.set, keyPrefix + i);
				Bin bin = new Bin(binName, i);

				console.Info("Put: namespace={0} set={1} key={2} bin={3} value={4}", 
					key.ns, key.setName, key.userKey, bin.name, bin.value);

				client.Put(args.writePolicy, key, bin);
			}
		}

		private void RunQuery(AerospikeClient client, Arguments args, string indexName, string binName, string packageContents)
		{
			int begin = 4;
			int end = 7;

			console.Info("Query for:ns={0} set={1} index={2} bin={3} >= {4} <= {5}", 
				args.ns, args.set, indexName, binName, begin, end);

			Statement stmt = new Statement();
			stmt.SetNamespace(args.ns);
			stmt.SetSetName(args.set);
			stmt.SetBinNames(binName);
			stmt.SetFilter(Filter.Range(binName, begin, end));
			stmt.SetAggregateFunction("sum_example", packageContents, "sum_single_bin", Value.Get(binName));

			ResultSet rs = client.QueryAggregate(null, stmt);

			try
			{
				int expected = 22; // 4 + 5 + 6 + 7
				int count = 0;

				while (rs.Next())
				{
					object obj = rs.Object;

					if (obj is long)
					{
						long sum = (long)rs.Object;

						if (expected == (int)sum)
						{
							console.Info("Sum matched: value=" + expected);
						}
						else
						{
							console.Error("Sum mismatch: Expected {0}. Received {1}.", expected, sum);
						}
					}
					else
					{
						console.Error("Unexpected return value: " + obj);
						continue;
					}
					count++;
				}

				if (count == 0)
				{
					console.Error("Query failed. No records returned.");
				}
			}
			finally
			{
				rs.Close();
			}
		}
	}
}

#3

Hi Brian,

I’m not sure I understood correctly this:

Aggregation queries currently support reading lua code from a file or from a C# resource, but not from a string

Does it mean that the current client (I tried 3.4.5 and 3.6.6) can use UDF with .Execute() but not with QueryAggregator() ?
I registered the UDF and it is visible from the UI and it is stored in the right folder, when you say “from a string” you meant from a registered package?


#4

The lua code is run on both the client and server for aggregation queries. For Execute(), the lua code is only run on the server. The lua code Register methods are only for stateful server registration which only needs to be performed once.

The client library is not stateful between client instantiations (there is no client database). Therefore, aggregation query lua code must be loaded into client memory on every client instantiation. The lua code load occurs dynamically when the aggregation query is run for the first time during that client instantiation.

The current client can use UDF for QueryAggregator(). You just need to use a lua code file or resource instead of a lua code string. See the current client’s “AerospikeDemo/QuerySum.cs” and “AerospikeDemo/QueryAverage.cs” example.

The next release will support a lua code string to be used with aggregation queries. The “QuerySum.cs” example will be modified to demonstrate a lua code string definition. I provided a preview of that example in my previous post.