How to do multiple filtering

Hello, we use Aerospike in production long time as a key-value cache. Now we starting to use as DataStorage, and we found out that UDFs not supported yet(see Support for Stream UDF · Issue #29 · aerospike/aerospike-client-go · GitHub)

So, we tied another way to use it:

  1. We tried ScanAll - but it fetch records in few gorutines(equal to Num of Nodes in cluster) - it was slow - not enough

  2. We tried to save all keys in application and do 4(we have 24 CPU cores on our servers) BatchGet’s - bug it doesn’t support filter statement. It’s mean - we will fetch all data, not only data required for filtering, but all data (see BatchGet - Do filter bins on server · Issue #60 · aerospike/aerospike-client-go · GitHub)

So, i’m looking for some efficient pattern which will help us to do filtering(not only by secondary index), and maybe aggregation(if it possible) - with GoLang client. Can you help me with it please?

From what I understand, you want to run a UDF on selected records?

In that case, you can use ExecuteUDF() method and run your UDF on a selection of records.

I can help more if you describe your use case in detail.

Thanks. Yes, let’s review 1 case.

We have records in format [{id:1, status:active, price:10, stock:1}, {id:2, status:active, price:20, stock:0}, {id:3, status:inactive, price:30, stock:1}]

Need to get all records with status:active and stock>0

Posted on Github, posting here for reference.

You can do it in Go as well if you don’t mind. Example:

const udfFilter = `
    local function map_record(record)
     -- Add name and age to returned map.
     -- Could add other record bins here as well.
     -- This code shows different data access to record bins
     return map {bin4=record.bin1, bin5=record["bin2"]}
    end

    function filter_records(stream, name)
     local function filter_name(record)
       -- Here is your compound where clause
       return (record.bin1 == -1) and (record.bin2 == name)
     end

     return stream : filter(filter_name) : map(map_record)
    end
`

regTask, err := client.RegisterUDF(nil, []byte(udfFilter), "udfFilter.lua", LUA)
panicOnErr(err)

// wait until UDF is created
err = <-regTask.OnComplete()
panicOnErr(err)

// setup statement
stm := NewStatement(namespace, set)
//set the predicate, index the field with better selectivity
stm.Addfilter(NewRangeFilter(bin3.Name, 0, math.MaxInt16/2))
// This is where UDF is set for filter
stm.SetAggregateFunction("udfFilter", "filter_records", []Value{NewValue("Aeropsike")}, true)

recordset, err := client.Query(policy, stm)
panicOnErr(err)

for rec := range recordset.Records {
  fmt.Println(rec.Bins)
}

This would return Rec.Bins as:

map[SUCCESS:map[bin4:constValue bin5:1]]
map[SUCCESS:map[bin4:constValue bin5:19]]
map[SUCCESS:map[bin4:constValue bin5:3]]
map[SUCCESS:map[bin4:constValue bin5:7]]
map[SUCCESS:map[bin4:constValue bin5:162]]
map[SUCCESS:map[bin4:constValue bin5:215]]
map[SUCCESS:map[bin4:constValue bin5:122]]

Excelent! I suggest add it to examples or documentation.

Can i ask here one more question:

Is it make sense to use primary key together with secondary?

Example:

I have secondary index on bin1. And also use manually created digests …001, …002.

How can i properly do request: bin1=1 and digest IN […001, …004, …008]

You can set the filter statement for query on Bin1, and then filter the digest in lua, and return your record.

Yes it works. Thanks for help. Now i trying to tune aerospike to speedup filtering in Lua (3 nodes Aerospike 10 slower than 3 nodes of ElasticSearch on 1M records), but i will ask it in separated topik(How to increase threads used by UDFs?).

1 Like