Hello, we use Aerospike in production long time as a key-value cache.
Now we starting to use as DataStorage, and we found out that UDFs not supported yet(see Support for Stream UDF · Issue #29 · aerospike/aerospike-client-go · GitHub)
So, we tied another way to use it:
-
We tried ScanAll - but it fetch records in few gorutines(equal to Num of Nodes in cluster) - it was slow - not enough
-
We tried to save all keys in application and do 4(we have 24 CPU cores on our servers) BatchGet’s - bug it doesn’t support filter statement. It’s mean - we will fetch all data, not only data required for filtering, but all data (see BatchGet - Do filter bins on server · Issue #60 · aerospike/aerospike-client-go · GitHub)
So, i’m looking for some efficient pattern which will help us to do filtering(not only by secondary index), and maybe aggregation(if it possible) - with GoLang client. Can you help me with it please?
From what I understand, you want to run a UDF on selected records?
In that case, you can use ExecuteUDF() method and run your UDF on a selection of records.
I can help more if you describe your use case in detail.
Thanks. Yes, let’s review 1 case.
We have records in format [{id:1, status:active, price:10, stock:1}, {id:2, status:active, price:20, stock:0}, {id:3, status:inactive, price:30, stock:1}]
Need to get all records with status:active and stock>0
Posted on Github, posting here for reference.
You can do it in Go as well if you don’t mind. Example:
const udfFilter = `
local function map_record(record)
-- Add name and age to returned map.
-- Could add other record bins here as well.
-- This code shows different data access to record bins
return map {bin4=record.bin1, bin5=record["bin2"]}
end
function filter_records(stream, name)
local function filter_name(record)
-- Here is your compound where clause
return (record.bin1 == -1) and (record.bin2 == name)
end
return stream : filter(filter_name) : map(map_record)
end
`
regTask, err := client.RegisterUDF(nil, []byte(udfFilter), "udfFilter.lua", LUA)
panicOnErr(err)
// wait until UDF is created
err = <-regTask.OnComplete()
panicOnErr(err)
// setup statement
stm := NewStatement(namespace, set)
//set the predicate, index the field with better selectivity
stm.Addfilter(NewRangeFilter(bin3.Name, 0, math.MaxInt16/2))
// This is where UDF is set for filter
stm.SetAggregateFunction("udfFilter", "filter_records", []Value{NewValue("Aeropsike")}, true)
recordset, err := client.Query(policy, stm)
panicOnErr(err)
for rec := range recordset.Records {
fmt.Println(rec.Bins)
}
This would return Rec.Bins
as:
map[SUCCESS:map[bin4:constValue bin5:1]]
map[SUCCESS:map[bin4:constValue bin5:19]]
map[SUCCESS:map[bin4:constValue bin5:3]]
map[SUCCESS:map[bin4:constValue bin5:7]]
map[SUCCESS:map[bin4:constValue bin5:162]]
map[SUCCESS:map[bin4:constValue bin5:215]]
map[SUCCESS:map[bin4:constValue bin5:122]]
Excelent! I suggest add it to examples or documentation.
Can i ask here one more question:
Is it make sense to use primary key together with secondary?
Example:
I have secondary index on bin1. And also use manually created digests …001, …002.
How can i properly do request: bin1=1 and digest IN […001, …004, …008]
You can set the filter statement for query on Bin1, and then filter the digest in lua, and return your record.
Yes it works. Thanks for help. Now i trying to tune aerospike to speedup filtering in Lua (3 nodes Aerospike 10 slower than 3 nodes of ElasticSearch on 1M records), but i will ask it in separated topik(How to increase threads used by UDFs?).
1 Like