Order by option

I finally implement a orderBy limit with Aerospike’s Stream UDFs:

------------------------------------------------------------------------------------------
--  Order By and Limit
------------------------------------------------------------------------------------------
function order_by(stream, arguments)

  local function map_record(rec, fields)
	-- Could add other record bins here as well.
	-- This code shows different data access to record bins
	local result = map()

	if fields ~= nil then -- selected fields
	for v in list.iterator(fields) do
	  result[v] = rec[v]
	end
	end

	if fields == nil then -- all fields
	local names = record.bin_names(rec)
	for i, v in ipairs(names) do
	  result[v] = rec[v]
	end
	end
	result["meta_data"] = map()
	result["meta_data"]["digest"] = record.digest(rec)
	result["meta_data"]["generation"] = record.gen(rec)
	result["meta_data"]["set_name"] = record.setname(rec)
	result["meta_data"]["expiry"] = record.ttl(rec)
	return result
  end

  local function compare(x, y)
    return (x < y and -1 ) or (x == y and 0 or 1)
  end

  local function compare(x, y, order)
    if order == "ASC" then
      return x < y
    else -- DESC
      return y < x
    end
  end

  local function list_truncate(l, limit)
    if list.size(l) > limit then
      info("list.size[%d] > limit[%d]. Trucate it.", list.size(l), limit)
      list.trim(l, limit + 1)
    end
  end

  -- insert a rec into a sorted list, return the insertion index for merge sort
  local function insert_sort(sorted_list, rec_map, sort_key, order, start_index)
    local v = rec_map[sort_key]
    debug("sort_key: %s, order: %s, value: %s", sort_key, order, v)
    if v == nil then
      return 0
    end

    len = list.size(sorted_list)
    for i = start_index or 1, len do
      v2 = sorted_list[i][sort_key]
      if compare(v, v2, order) then
        list.insert(sorted_list, i, rec_map)
        return i
      end
    end

    list.append(sorted_list, rec_map)
    return len
  end

  local function sort_aggregator(sort_key, order, limit)
    -- insert a rec into a sorted list is quite easy
    return function(sorted_list, rec)
      -- convert rec to map
      local rec_map = map_record(rec)

      -- apply orderBy
      insert_sort(sorted_list, rec_map, sort_key, order)

      -- apply limit
      list_truncate(sorted_list, limit)

      return sorted_list
    end
  end

  local function sort_reducer(sort_key, order, limit)
    return function(sorted_list1, sorted_list2)
      -- apply merge sort
      local start_index;
      for i = 1, list.size(sorted_list2) do
        local rec_map = sorted_list2[i]
        start_index = insert_sort(sorted_list1, rec_map, sort_key, order, start_index)
      end

      -- apply limit
      list_truncate(sorted_list1, limit)
      return sorted_list1
    end
  end

  -- default order by id ASC, limit 100
  local sort_key;
  local order;
  local limit = 100
  if arguments ~= nil then -- only support one sort key right now
    sort_key = arguments["sorters"][1]["sort_key"] or "id"
    order = arguments["sorters"][1]["order"] or "ASC"
    limit = arguments["limit"] or 100
  end
  local aggregator = sort_aggregator(sort_key, order, limit)
  local reducer = sort_reducer(sort_key, order, limit)
  return stream : aggregate(list(), aggregator) : reduce(reducer)
end

The client invoke code is very simple, just remember that reduce() only return ONE element in ResultSet(a list in this example):

private KeyRecordIterator queryAggregateByLua(Statement stmt, Qualifier[] qualifiers, //
            OrderList orderList, int limit) {
    Map<String, Object> argument = new HashMap<>();
    List<Value.MapValue> argumentSorters = new ArrayList<>();
    for (OrderEntry order : orderList) {
        Map<String, Object> s = new HashMap<>();
        s.put("sort_key", order.getProperty());
        s.put("order", order.getOrder().name());
        argumentSorters.add(new Value.MapValue(s));
    }
    argument.put("sorters", new Value.ListValue(argumentSorters));

    if (limit > 0) {
        argument.put("limit", limit);
    }

    stmt.setAggregateFunction(this.getClass().getClassLoader(), AS_UTILITY_PATH, QUERY_MODULE, "order_by",
            Value.get(argument));
    ResultSet resultSet = client.queryAggregate(DEFAULT_QUERY_POLICY, stmt);

    if (resultSet == null) {
        return new KeyRecordIterator(stmt.getNamespace(), Collections.emptyList());
    } else { // aggregate return One list element here
        List list = (List) resultSet.iterator().next();
        return new KeyRecordIterator(stmt.getNamespace(), list);
    }
}

NOTES

  1. only support one sorted key right now
  2. only support limit but not pagination, ie. limit without offset. This is because Aerospike Stream UDFs execute on all nodes, we can not do pagination in a sharding environment.

See more detail in my blog: Aerospike UDF学习笔记

1 Like