Order by option

Is there any option to do order by bin

HI

Aerospike does not support an “order by” command or filter. But is is possible to programmatically process the output stream of a query using stream UDF . A cleaver stream UDF can filter, map, aggregate and reduce the results of a query.

I hope this helps

Thanks. Is there any example to do order by using stream UDF?

HI,

There is no orderby example available. But the way you would do it is an insert sort in the Aggregate function and an ordered merge in the Reduce function.

But beware, you could end up with some very large results sets that could exceed the heap space of you application. What is the expect size of your result set?

Do you need to return all the records to the application OR do you really want to do some aggregation or analytics on those records? If the latter, why not do it in the Stream UDF functions.

I will have above 100000 records in set. I have put filter on the result with udf. now I need to do sorting on it. I was thinking to do with Java Comparator interface, Is it ok? looking for the sorting with udf

We did sorting feature in lua udf. But it was unexpected and unstable because aerospike is distributed server. So, we are using comparator in client side.

Hi

I have worked up an example of groupby and orderby using UDF. You can find the code here.

I hope this helps

Peter

Hi @helipilot50, it seams that the code example is a groupBy rather than a orderBy example.

I finally implement a orderBy limit with Aerospike’s Stream UDFs:

------------------------------------------------------------------------------------------
--  Order By and Limit
------------------------------------------------------------------------------------------
function order_by(stream, arguments)

  local function map_record(rec, fields)
	-- Could add other record bins here as well.
	-- This code shows different data access to record bins
	local result = map()

	if fields ~= nil then -- selected fields
	for v in list.iterator(fields) do
	  result[v] = rec[v]
	end
	end

	if fields == nil then -- all fields
	local names = record.bin_names(rec)
	for i, v in ipairs(names) do
	  result[v] = rec[v]
	end
	end
	result["meta_data"] = map()
	result["meta_data"]["digest"] = record.digest(rec)
	result["meta_data"]["generation"] = record.gen(rec)
	result["meta_data"]["set_name"] = record.setname(rec)
	result["meta_data"]["expiry"] = record.ttl(rec)
	return result
  end

  local function compare(x, y)
    return (x < y and -1 ) or (x == y and 0 or 1)
  end

  local function compare(x, y, order)
    if order == "ASC" then
      return x < y
    else -- DESC
      return y < x
    end
  end

  local function list_truncate(l, limit)
    if list.size(l) > limit then
      info("list.size[%d] > limit[%d]. Trucate it.", list.size(l), limit)
      list.trim(l, limit + 1)
    end
  end

  -- insert a rec into a sorted list, return the insertion index for merge sort
  local function insert_sort(sorted_list, rec_map, sort_key, order, start_index)
    local v = rec_map[sort_key]
    debug("sort_key: %s, order: %s, value: %s", sort_key, order, v)
    if v == nil then
      return 0
    end

    len = list.size(sorted_list)
    for i = start_index or 1, len do
      v2 = sorted_list[i][sort_key]
      if compare(v, v2, order) then
        list.insert(sorted_list, i, rec_map)
        return i
      end
    end

    list.append(sorted_list, rec_map)
    return len
  end

  local function sort_aggregator(sort_key, order, limit)
    -- insert a rec into a sorted list is quite easy
    return function(sorted_list, rec)
      -- convert rec to map
      local rec_map = map_record(rec)

      -- apply orderBy
      insert_sort(sorted_list, rec_map, sort_key, order)

      -- apply limit
      list_truncate(sorted_list, limit)

      return sorted_list
    end
  end

  local function sort_reducer(sort_key, order, limit)
    return function(sorted_list1, sorted_list2)
      -- apply merge sort
      local start_index;
      for i = 1, list.size(sorted_list2) do
        local rec_map = sorted_list2[i]
        start_index = insert_sort(sorted_list1, rec_map, sort_key, order, start_index)
      end

      -- apply limit
      list_truncate(sorted_list1, limit)
      return sorted_list1
    end
  end

  -- default order by id ASC, limit 100
  local sort_key;
  local order;
  local limit = 100
  if arguments ~= nil then -- only support one sort key right now
    sort_key = arguments["sorters"][1]["sort_key"] or "id"
    order = arguments["sorters"][1]["order"] or "ASC"
    limit = arguments["limit"] or 100
  end
  local aggregator = sort_aggregator(sort_key, order, limit)
  local reducer = sort_reducer(sort_key, order, limit)
  return stream : aggregate(list(), aggregator) : reduce(reducer)
end

The client invoke code is very simple, just remember that reduce() only return ONE element in ResultSet(a list in this example):

private KeyRecordIterator queryAggregateByLua(Statement stmt, Qualifier[] qualifiers, //
            OrderList orderList, int limit) {
    Map<String, Object> argument = new HashMap<>();
    List<Value.MapValue> argumentSorters = new ArrayList<>();
    for (OrderEntry order : orderList) {
        Map<String, Object> s = new HashMap<>();
        s.put("sort_key", order.getProperty());
        s.put("order", order.getOrder().name());
        argumentSorters.add(new Value.MapValue(s));
    }
    argument.put("sorters", new Value.ListValue(argumentSorters));

    if (limit > 0) {
        argument.put("limit", limit);
    }

    stmt.setAggregateFunction(this.getClass().getClassLoader(), AS_UTILITY_PATH, QUERY_MODULE, "order_by",
            Value.get(argument));
    ResultSet resultSet = client.queryAggregate(DEFAULT_QUERY_POLICY, stmt);

    if (resultSet == null) {
        return new KeyRecordIterator(stmt.getNamespace(), Collections.emptyList());
    } else { // aggregate return One list element here
        List list = (List) resultSet.iterator().next();
        return new KeyRecordIterator(stmt.getNamespace(), list);
    }
}

NOTES

  1. only support one sorted key right now
  2. only support limit but not pagination, ie. limit without offset. This is because Aerospike Stream UDFs execute on all nodes, we can not do pagination in a sharding environment.

See more detail in my blog: Aerospike UDF学习笔记

1 Like

I also need to add group by in this, can you help

I have tried sorting using UDFs but the results are unstable for each run. Did you test the stability of this implementation ? What where your findings ?

Take a look at Piyush’s developer blog post

It uses post processing on results fetched by a query with a PredExp filter. As of Aerospike Database 5.2.0 the Expressions is a much more expressive replacement. The technique Piyush talks about still holds, you can just filter better with Expressions.