Composit Key

Dear Team,

Can aerospike support composite key like Apache Cassandra?

Thanks Sumit Thakur

Not a key made out of two values in two separate columns - you have to concatenate the two into one string yourself and then use this composite string key as your key for the record.

For example: state:id as: “CA:351” , “CA:452”, “NY:455” … etc

Thanks, Piyush,

Can I make read query with " CA:* " to fetch all records started with key “CA:” like (“CA:351”, “CA:452”)?

I don’t think so unless you store the id as a bin separately in the record and then build a secondary index on id.

| Key |  id  | .....
| CA:452| 452 |  ....

Thanks, Piyush for the reply.

This won’t do anything. For example if I wanted to select a ticker (a given security) for the most 14 recent entries with Spark and Scala, I would need to do:

val dfShort = dfBatchRead.filter(col(“ticker”) === ticker).sort(col(“timeissued”).desc).limit(14)

So I will need a composite index with ticker + timeissued which concatenation won’t help.

aql> create index tickerTimeissuedS on trading.MARKETDATAAEROSPIKEBATCH (ticker:timeissued) STRING Error: (4) Bin Name too long

or may be you mean create an additional bin for composite index?

There is nothing like “composite index” in Aerospike. You have the primary index of a record and your key of the record. I was talking about making your key as a composite string to split large records into a number of smaller ones. The problem you pose is quite different.

My mere concern is whether aerospike can return data back to Spark streaming for a given security and 14 most recent entries as per logic below

       val dfShort = dfBatchRead.filter(col("ticker") === ticker).sort(col("timeissued").desc).limit(14).cache

dfBatchRead is the DataFrame (in Spark) created on Aerospike set as below:

val dfBatchRead = sqlContext.read. format(“com.aerospike.spark.sql”). option(“aerospike.set”, dbSetRead). load

For Aerospike I have created two secondary indexes on ticker and timeissued respectively. As long as Aerospike can take advantages of those two indexes to return data to Spark, then the rest is fine and Spark will take care of it.

The ordering and sorting will be done in Spark in memory as shown below:

InMemoryTableScan number of output rows: 14scan time total (min, med, max): 0 ms (0 ms, 0 ms, 0 ms

The logical plan shows that:

== Parsed Logical Plan == 'Project [avg('price) AS avg(price)#176267, stddev_samp('price) AS stddev_samp(price)#176276] ± AnalysisBarrier ± GlobalLimit 14 ± LocalLimit 14 ± Sort [timeissued#7 DESC NULLS LAST], true ± Filter (ticker#8 = MKS) ± Relation[__key#0,__digest#1,__expiry#2,__generation#3,__ttl#4,price#5,rowkey#6,timeissued#7,ticker#8] com.aerospike.spark.sql.AerospikeRelation@6252094d

There don’t seem to be an issue in retrieving the data from aerospike. Spark will do the sorting itself in memory. My point was how I can see if these two indexes are used. But again that is a moot point.

Thanks

I am not a spark expert, so can’t comment on your spark execution. However, Aerospike cannot return results from two secondary index queries simultaneously. You can build and keep up to 256 secondary indexes but you can reduce the full record-set to a subset (result-set) by employing only one secondary index at a time. However, you do have versatile predicate filters that you can then apply to the secondary index query generated result-set to further reduce the result-set to matching criteria. Look at the predicate filtering page here.

Let me understand this if I may please

I have two indexes on set trading.MARKETDATAAEROSPIKEBATCH on bins ticker and timeissued

So you can query on one index at a time… For example - give me all the records where ticker = ASPK - then you can add a predicate filter to that query -… from these records, give me those where timeissued is > x … whatever. Having an SI on timeissued does not help you there. You can query your data using one of the indexes plus additional predicate filtering.

Follow on, I have two indexes on bins ticker and timeissued.

From AQL. If I query the set with a bin not indexed, it won’t work

select * from trading.MARKETDATAAEROSPIKEBATCH WHERE price = 22 0 rows in set (0.000 secs)

Error: (201) AEROSPIKE_ERR_INDEX_NOT_FOUND

OK there is no index on price. Let us try search with an indexed bin in this case timeissued

Apologies the note above was cut short!

aql> select * from trading.MARKETDATAAEROSPIKEBATCH WHERE price = 22 0 rows in set (0.000 secs) Error: (201) AEROSPIKE_ERR_INDEX_NOT_FOUND OK there is no index on price. Let us try search with an indexed bin in this case timeissued

aql> select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25” ±--------±-------±----------------------+ | ticker | price | timeissued | ±--------±-------±----------------------+ | “MRW” | 284.51 | “2019-07-02T13:25:25” | | “TSCO” | 465.75 | “2019-07-02T13:25:25” | | “VOD” | 311.11 | “2019-07-02T13:25:25” | | “MSFT” | 25.83 | “2019-07-02T13:25:25” | | “SBRY” | 265.85 | “2019-07-02T13:25:25” | | “BP” | 436.71 | “2019-07-02T13:25:25” | | “IBM” | 113.04 | “2019-07-02T13:25:25” | | “ORCL” | 19.24 | “2019-07-02T13:25:25” | | “SAP” | 63.96 | “2019-07-02T13:25:25” | | “MKS” | 492.1 | “2019-07-02T13:25:25” | ±--------±-------±----------------------+ 10 rows in set (0.001 secs) All good. In returns 10 rows of prices as expected

Now let me narrow down the query and choose predicate ticker = “IBM”

aql> select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25” and ticker = “IBM” ±--------±-------±----------------------+ | ticker | price | timeissued | ±--------±-------±----------------------+ | “MRW” | 284.51 | “2019-07-02T13:25:25” | | “TSCO” | 465.75 | “2019-07-02T13:25:25” | | “VOD” | 311.11 | “2019-07-02T13:25:25” | | “MSFT” | 25.83 | “2019-07-02T13:25:25” | | “SBRY” | 265.85 | “2019-07-02T13:25:25” | | “BP” | 436.71 | “2019-07-02T13:25:25” | | “IBM” | 113.04 | “2019-07-02T13:25:25” | | “ORCL” | 19.24 | “2019-07-02T13:25:25” | | “SAP” | 63.96 | “2019-07-02T13:25:25” | | “MKS” | 492.1 | “2019-07-02T13:25:25” | ±--------±-------±----------------------+ 10 rows in set (0.001 secs) These results are incorrect. It should only return one record (the red one). Regardless of whether the index is used or not the query returns the wrong result set.

Let us try this through Spark SQL and just create a temporary view to run SQL on

 spark.conf.set("aerospike.seedhost", dbHost)
 spark.conf.set("aerospike.port", dbPort)
 spark.conf.set("aerospike.namespace",namespace)
 spark.conf.set("aerospike.set", dbSet)
 spark.conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
 spark.conf.set("aerospike.user", dbConnection)
 spark.conf.set("aerospike.password", dbPassword)

// create dataframe on top of Aerospike set val dfBatchRead = sqlContext.read. format(“com.aerospike.spark.sql”). option(“aerospike.batchMax”, 10000). load // Create a SQL view on top of the dataframe dfBatchRead.createOrReplaceTempView(“tmp”)

// Do the predicate selection (transformation) and collection here spark.sql(“”“select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25"””").collect.foreach(println)

And this is the output

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@d7b38cf dfBatchRead: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary … 7 more fields] [IBM,113.04,2019-07-02T13:25:25]

With one record returned as expected. The advantage of an API like Spark is that regardless of what the underlying storage is (it could be Aerospark, MongoDB, Hbase etc), the ANSI SQL is applied. It should be noted that we still rely on Aerospark-Spark connector API “com.aerospike.spark.sql” to fetch data from the underlying DB.

HTH

AQL is a utility tool with basic functionality - not the full Aerospike API, nor does it support ANSI-SQL. Its a lightweight - application written on top the C client. It has not been updated to support predicate filtering. About AQL here: https://www.aerospike.com/docs/tools/aql/index.html

You might want to write a small application in Java using the Java client if you want to play with predicate filters. See java client code here: GitHub - aerospike/aerospike-client-java: Aerospike Java Client Library and look at: aerospike-client-java/tree/master/examples/src/com/aerospike/examples/ QueryPredExp.java

Making a key of composite type will distribute data in diff nodes, How to ensure records for a user goes to same node ?

why should it matter?

Performance … Making sure that user data is in a single node will consume less network bandwidth when the client fetches data .

Any why should a user data be distributed ? It should be present in a specific node.

In redis there is a construct of tags that make sure that record is present in same slot , while ensuring the key is of composite type ex: {uidx}:card_type1, {uidx}: card_type2

This ensures that all uidx data is present in same node.

Is there a similar construct in aerospike ?

Aerospike does not offer composite keys based on bin data - which is what you are looking for. See more here: Composite keys for Aerospike - #3 by pgupta

Also, this discussion thread may also help: Using Aerospike to process data where uniqueness is defined by combination of keys - Stack Overflow