Order by option

I finally implement a orderBy limit with Aerospike’s Stream UDFs:

--  Order By and Limit
function order_by(stream, arguments)

  local function map_record(rec, fields)
	-- Could add other record bins here as well.
	-- This code shows different data access to record bins
	local result = map()

	if fields ~= nil then -- selected fields
	for v in list.iterator(fields) do
	  result[v] = rec[v]

	if fields == nil then -- all fields
	local names = record.bin_names(rec)
	for i, v in ipairs(names) do
	  result[v] = rec[v]
	result["meta_data"] = map()
	result["meta_data"]["digest"] = record.digest(rec)
	result["meta_data"]["generation"] = record.gen(rec)
	result["meta_data"]["set_name"] = record.setname(rec)
	result["meta_data"]["expiry"] = record.ttl(rec)
	return result

  local function compare(x, y)
    return (x < y and -1 ) or (x == y and 0 or 1)

  local function compare(x, y, order)
    if order == "ASC" then
      return x < y
    else -- DESC
      return y < x

  local function list_truncate(l, limit)
    if list.size(l) > limit then
      info("list.size[%d] > limit[%d]. Trucate it.", list.size(l), limit)
      list.trim(l, limit + 1)

  -- insert a rec into a sorted list, return the insertion index for merge sort
  local function insert_sort(sorted_list, rec_map, sort_key, order, start_index)
    local v = rec_map[sort_key]
    debug("sort_key: %s, order: %s, value: %s", sort_key, order, v)
    if v == nil then
      return 0

    len = list.size(sorted_list)
    for i = start_index or 1, len do
      v2 = sorted_list[i][sort_key]
      if compare(v, v2, order) then
        list.insert(sorted_list, i, rec_map)
        return i

    list.append(sorted_list, rec_map)
    return len

  local function sort_aggregator(sort_key, order, limit)
    -- insert a rec into a sorted list is quite easy
    return function(sorted_list, rec)
      -- convert rec to map
      local rec_map = map_record(rec)

      -- apply orderBy
      insert_sort(sorted_list, rec_map, sort_key, order)

      -- apply limit
      list_truncate(sorted_list, limit)

      return sorted_list

  local function sort_reducer(sort_key, order, limit)
    return function(sorted_list1, sorted_list2)
      -- apply merge sort
      local start_index;
      for i = 1, list.size(sorted_list2) do
        local rec_map = sorted_list2[i]
        start_index = insert_sort(sorted_list1, rec_map, sort_key, order, start_index)

      -- apply limit
      list_truncate(sorted_list1, limit)
      return sorted_list1

  -- default order by id ASC, limit 100
  local sort_key;
  local order;
  local limit = 100
  if arguments ~= nil then -- only support one sort key right now
    sort_key = arguments["sorters"][1]["sort_key"] or "id"
    order = arguments["sorters"][1]["order"] or "ASC"
    limit = arguments["limit"] or 100
  local aggregator = sort_aggregator(sort_key, order, limit)
  local reducer = sort_reducer(sort_key, order, limit)
  return stream : aggregate(list(), aggregator) : reduce(reducer)

The client invoke code is very simple, just remember that reduce() only return ONE element in ResultSet(a list in this example):

private KeyRecordIterator queryAggregateByLua(Statement stmt, Qualifier[] qualifiers, //
            OrderList orderList, int limit) {
    Map<String, Object> argument = new HashMap<>();
    List<Value.MapValue> argumentSorters = new ArrayList<>();
    for (OrderEntry order : orderList) {
        Map<String, Object> s = new HashMap<>();
        s.put("sort_key", order.getProperty());
        s.put("order", order.getOrder().name());
        argumentSorters.add(new Value.MapValue(s));
    argument.put("sorters", new Value.ListValue(argumentSorters));

    if (limit > 0) {
        argument.put("limit", limit);

    stmt.setAggregateFunction(this.getClass().getClassLoader(), AS_UTILITY_PATH, QUERY_MODULE, "order_by",
    ResultSet resultSet = client.queryAggregate(DEFAULT_QUERY_POLICY, stmt);

    if (resultSet == null) {
        return new KeyRecordIterator(stmt.getNamespace(), Collections.emptyList());
    } else { // aggregate return One list element here
        List list = (List) resultSet.iterator().next();
        return new KeyRecordIterator(stmt.getNamespace(), list);


  1. only support one sorted key right now
  2. only support limit but not pagination, ie. limit without offset. This is because Aerospike Stream UDFs execute on all nodes, we can not do pagination in a sharding environment.

See more detail in my blog: Aerospike UDF学习笔记

1 Like