I finally implement a orderBy limit with Aerospike’s Stream UDFs:
------------------------------------------------------------------------------------------
-- Order By and Limit
------------------------------------------------------------------------------------------
function order_by(stream, arguments)
local function map_record(rec, fields)
-- Could add other record bins here as well.
-- This code shows different data access to record bins
local result = map()
if fields ~= nil then -- selected fields
for v in list.iterator(fields) do
result[v] = rec[v]
end
end
if fields == nil then -- all fields
local names = record.bin_names(rec)
for i, v in ipairs(names) do
result[v] = rec[v]
end
end
result["meta_data"] = map()
result["meta_data"]["digest"] = record.digest(rec)
result["meta_data"]["generation"] = record.gen(rec)
result["meta_data"]["set_name"] = record.setname(rec)
result["meta_data"]["expiry"] = record.ttl(rec)
return result
end
local function compare(x, y)
return (x < y and -1 ) or (x == y and 0 or 1)
end
local function compare(x, y, order)
if order == "ASC" then
return x < y
else -- DESC
return y < x
end
end
local function list_truncate(l, limit)
if list.size(l) > limit then
info("list.size[%d] > limit[%d]. Trucate it.", list.size(l), limit)
list.trim(l, limit + 1)
end
end
-- insert a rec into a sorted list, return the insertion index for merge sort
local function insert_sort(sorted_list, rec_map, sort_key, order, start_index)
local v = rec_map[sort_key]
debug("sort_key: %s, order: %s, value: %s", sort_key, order, v)
if v == nil then
return 0
end
len = list.size(sorted_list)
for i = start_index or 1, len do
v2 = sorted_list[i][sort_key]
if compare(v, v2, order) then
list.insert(sorted_list, i, rec_map)
return i
end
end
list.append(sorted_list, rec_map)
return len
end
local function sort_aggregator(sort_key, order, limit)
-- insert a rec into a sorted list is quite easy
return function(sorted_list, rec)
-- convert rec to map
local rec_map = map_record(rec)
-- apply orderBy
insert_sort(sorted_list, rec_map, sort_key, order)
-- apply limit
list_truncate(sorted_list, limit)
return sorted_list
end
end
local function sort_reducer(sort_key, order, limit)
return function(sorted_list1, sorted_list2)
-- apply merge sort
local start_index;
for i = 1, list.size(sorted_list2) do
local rec_map = sorted_list2[i]
start_index = insert_sort(sorted_list1, rec_map, sort_key, order, start_index)
end
-- apply limit
list_truncate(sorted_list1, limit)
return sorted_list1
end
end
-- default order by id ASC, limit 100
local sort_key;
local order;
local limit = 100
if arguments ~= nil then -- only support one sort key right now
sort_key = arguments["sorters"][1]["sort_key"] or "id"
order = arguments["sorters"][1]["order"] or "ASC"
limit = arguments["limit"] or 100
end
local aggregator = sort_aggregator(sort_key, order, limit)
local reducer = sort_reducer(sort_key, order, limit)
return stream : aggregate(list(), aggregator) : reduce(reducer)
end
The client invoke code is very simple, just remember that reduce() only return ONE element in ResultSet(a list in this example):
private KeyRecordIterator queryAggregateByLua(Statement stmt, Qualifier[] qualifiers, //
OrderList orderList, int limit) {
Map<String, Object> argument = new HashMap<>();
List<Value.MapValue> argumentSorters = new ArrayList<>();
for (OrderEntry order : orderList) {
Map<String, Object> s = new HashMap<>();
s.put("sort_key", order.getProperty());
s.put("order", order.getOrder().name());
argumentSorters.add(new Value.MapValue(s));
}
argument.put("sorters", new Value.ListValue(argumentSorters));
if (limit > 0) {
argument.put("limit", limit);
}
stmt.setAggregateFunction(this.getClass().getClassLoader(), AS_UTILITY_PATH, QUERY_MODULE, "order_by",
Value.get(argument));
ResultSet resultSet = client.queryAggregate(DEFAULT_QUERY_POLICY, stmt);
if (resultSet == null) {
return new KeyRecordIterator(stmt.getNamespace(), Collections.emptyList());
} else { // aggregate return One list element here
List list = (List) resultSet.iterator().next();
return new KeyRecordIterator(stmt.getNamespace(), list);
}
}
NOTES
- only support one sorted key right now
- only support limit but not pagination, ie. limit without offset. This is because Aerospike Stream UDFs execute on all nodes, we can not do pagination in a sharding environment.
See more detail in my blog: Aerospike UDF学习笔记