Is there any option to do order by bin
HI
Aerospike does not support an “order by” command or filter. But is is possible to programmatically process the output stream of a query using stream UDF . A cleaver stream UDF can filter, map, aggregate and reduce the results of a query.
I hope this helps
Thanks. Is there any example to do order by using stream UDF?
HI,
There is no orderby example available. But the way you would do it is an insert sort in the Aggregate function and an ordered merge in the Reduce function.
But beware, you could end up with some very large results sets that could exceed the heap space of you application. What is the expect size of your result set?
Do you need to return all the records to the application OR do you really want to do some aggregation or analytics on those records? If the latter, why not do it in the Stream UDF functions.
I will have above 100000 records in set. I have put filter on the result with udf. now I need to do sorting on it. I was thinking to do with Java Comparator interface, Is it ok? looking for the sorting with udf
We did sorting feature in lua udf. But it was unexpected and unstable because aerospike is distributed server. So, we are using comparator in client side.
Hi
I have worked up an example of groupby and orderby using UDF. You can find the code here.
I hope this helps
Peter
Hi @helipilot50, it seams that the code example is a groupBy rather than a orderBy example.
I finally implement a orderBy limit with Aerospike’s Stream UDFs:
------------------------------------------------------------------------------------------
-- Order By and Limit
------------------------------------------------------------------------------------------
function order_by(stream, arguments)
local function map_record(rec, fields)
-- Could add other record bins here as well.
-- This code shows different data access to record bins
local result = map()
if fields ~= nil then -- selected fields
for v in list.iterator(fields) do
result[v] = rec[v]
end
end
if fields == nil then -- all fields
local names = record.bin_names(rec)
for i, v in ipairs(names) do
result[v] = rec[v]
end
end
result["meta_data"] = map()
result["meta_data"]["digest"] = record.digest(rec)
result["meta_data"]["generation"] = record.gen(rec)
result["meta_data"]["set_name"] = record.setname(rec)
result["meta_data"]["expiry"] = record.ttl(rec)
return result
end
local function compare(x, y)
return (x < y and -1 ) or (x == y and 0 or 1)
end
local function compare(x, y, order)
if order == "ASC" then
return x < y
else -- DESC
return y < x
end
end
local function list_truncate(l, limit)
if list.size(l) > limit then
info("list.size[%d] > limit[%d]. Trucate it.", list.size(l), limit)
list.trim(l, limit + 1)
end
end
-- insert a rec into a sorted list, return the insertion index for merge sort
local function insert_sort(sorted_list, rec_map, sort_key, order, start_index)
local v = rec_map[sort_key]
debug("sort_key: %s, order: %s, value: %s", sort_key, order, v)
if v == nil then
return 0
end
len = list.size(sorted_list)
for i = start_index or 1, len do
v2 = sorted_list[i][sort_key]
if compare(v, v2, order) then
list.insert(sorted_list, i, rec_map)
return i
end
end
list.append(sorted_list, rec_map)
return len
end
local function sort_aggregator(sort_key, order, limit)
-- insert a rec into a sorted list is quite easy
return function(sorted_list, rec)
-- convert rec to map
local rec_map = map_record(rec)
-- apply orderBy
insert_sort(sorted_list, rec_map, sort_key, order)
-- apply limit
list_truncate(sorted_list, limit)
return sorted_list
end
end
local function sort_reducer(sort_key, order, limit)
return function(sorted_list1, sorted_list2)
-- apply merge sort
local start_index;
for i = 1, list.size(sorted_list2) do
local rec_map = sorted_list2[i]
start_index = insert_sort(sorted_list1, rec_map, sort_key, order, start_index)
end
-- apply limit
list_truncate(sorted_list1, limit)
return sorted_list1
end
end
-- default order by id ASC, limit 100
local sort_key;
local order;
local limit = 100
if arguments ~= nil then -- only support one sort key right now
sort_key = arguments["sorters"][1]["sort_key"] or "id"
order = arguments["sorters"][1]["order"] or "ASC"
limit = arguments["limit"] or 100
end
local aggregator = sort_aggregator(sort_key, order, limit)
local reducer = sort_reducer(sort_key, order, limit)
return stream : aggregate(list(), aggregator) : reduce(reducer)
end
The client invoke code is very simple, just remember that reduce() only return ONE element in ResultSet(a list in this example):
private KeyRecordIterator queryAggregateByLua(Statement stmt, Qualifier[] qualifiers, //
OrderList orderList, int limit) {
Map<String, Object> argument = new HashMap<>();
List<Value.MapValue> argumentSorters = new ArrayList<>();
for (OrderEntry order : orderList) {
Map<String, Object> s = new HashMap<>();
s.put("sort_key", order.getProperty());
s.put("order", order.getOrder().name());
argumentSorters.add(new Value.MapValue(s));
}
argument.put("sorters", new Value.ListValue(argumentSorters));
if (limit > 0) {
argument.put("limit", limit);
}
stmt.setAggregateFunction(this.getClass().getClassLoader(), AS_UTILITY_PATH, QUERY_MODULE, "order_by",
Value.get(argument));
ResultSet resultSet = client.queryAggregate(DEFAULT_QUERY_POLICY, stmt);
if (resultSet == null) {
return new KeyRecordIterator(stmt.getNamespace(), Collections.emptyList());
} else { // aggregate return One list element here
List list = (List) resultSet.iterator().next();
return new KeyRecordIterator(stmt.getNamespace(), list);
}
}
NOTES
- only support one sorted key right now
- only support limit but not pagination, ie. limit without offset. This is because Aerospike Stream UDFs execute on all nodes, we can not do pagination in a sharding environment.
See more detail in my blog: Aerospike UDFĺ¦äą 笔记
I also need to add group by in this, can you help
I have tried sorting using UDFs but the results are unstable for each run. Did you test the stability of this implementation ? What where your findings ?
Take a look at Piyush’s developer blog post
It uses post processing on results fetched by a query with a PredExp filter. As of Aerospike Database 5.2.0 the Expressions is a much more expressive replacement. The technique Piyush talks about still holds, you can just filter better with Expressions.