public class AerospikeStreamExample {
public static void main(String[] args) {
AerospikeClient client = new AerospikeClient("15.206.70.97", 3000);
// Register the Lua script
String luaScript =
"local function filter_by_age(record)\n" +
" if record.age then\n" +
" if record.age < 30 then\n" +
" return true\n" +
" end\n" +
" end\n" +
" return false\n" +
"end\n" +
"\n" +
"function count_records(stream)\n" +
" local filtered_stream = stream:filter(filter_by_age)\n" +
" local count = filtered_stream:count()\n" +
" return count or 0\n" +
"end\n";
RegisterTask task = client.registerUdfString(null, luaScript, "stream_example.lua", Language.LUA);
if (task.isDone()) {
System.out.println("UDF registered successfully.");
} else {
System.err.println("Failed to register UDF.");
}
// Create a stream using a query
String namespace = "test";
String set = "User2";
Statement statement = new Statement();
statement.setNamespace(namespace);
statement.setSetName(set);
statement.setBinNames("age"); // Change "value" to "age" as per the Lua script
// Process the stream result
try {
ResultSet resultSet = client.queryAggregate(null, statement, "stream_example", "count_records");
long count = 0;
while (resultSet.next()) {
Object result = resultSet.getObject();
count = (long) result;
System.out.println("Number of records with age < 30: " + count);
}
if (count == 0) {
System.out.println("No records found with age < 30.");
}
} catch (AerospikeException e) {
e.printStackTrace();
}
client.close();
}
}
I have posted the lua script UDF along with the java client code for your reference.