Dear Team,
Can aerospike support composite key like Apache Cassandra?
Thanks Sumit Thakur
Dear Team,
Can aerospike support composite key like Apache Cassandra?
Thanks Sumit Thakur
Not a key made out of two values in two separate columns - you have to concatenate the two into one string yourself and then use this composite string key as your key for the record.
For example: state:id as: “CA:351” , “CA:452”, “NY:455” … etc
Thanks, Piyush,
Can I make read query with " CA:* " to fetch all records started with key “CA:” like (“CA:351”, “CA:452”)?
I don’t think so unless you store the id as a bin separately in the record and then build a secondary index on id.
| Key | id | ..... | CA:452| 452 | ....
Thanks, Piyush for the reply.
This won’t do anything. For example if I wanted to select a ticker (a given security) for the most 14 recent entries with Spark and Scala, I would need to do:
val dfShort = dfBatchRead.filter(col(“ticker”) === ticker).sort(col(“timeissued”).desc).limit(14)
So I will need a composite index with ticker + timeissued which concatenation won’t help.
aql> create index tickerTimeissuedS on trading.MARKETDATAAEROSPIKEBATCH (ticker:timeissued) STRING Error: (4) Bin Name too long
or may be you mean create an additional bin for composite index?
There is nothing like “composite index” in Aerospike. You have the primary index of a record and your key of the record. I was talking about making your key as a composite string to split large records into a number of smaller ones. The problem you pose is quite different.
My mere concern is whether aerospike can return data back to Spark streaming for a given security and 14 most recent entries as per logic below
val dfShort = dfBatchRead.filter(col("ticker") === ticker).sort(col("timeissued").desc).limit(14).cache
dfBatchRead is the DataFrame (in Spark) created on Aerospike set as below:
val dfBatchRead = sqlContext.read. format(“com.aerospike.spark.sql”). option(“aerospike.set”, dbSetRead). load
For Aerospike I have created two secondary indexes on ticker and timeissued respectively. As long as Aerospike can take advantages of those two indexes to return data to Spark, then the rest is fine and Spark will take care of it.
The ordering and sorting will be done in Spark in memory as shown below:
InMemoryTableScan number of output rows: 14scan time total (min, med, max): 0 ms (0 ms, 0 ms, 0 ms
The logical plan shows that:
== Parsed Logical Plan == 'Project [avg('price) AS avg(price)#176267, stddev_samp('price) AS stddev_samp(price)#176276] ± AnalysisBarrier ± GlobalLimit 14 ± LocalLimit 14 ± Sort [timeissued#7 DESC NULLS LAST], true ± Filter (ticker#8 = MKS) ± Relation[__key#0,__digest#1,__expiry#2,__generation#3,__ttl#4,price#5,rowkey#6,timeissued#7,ticker#8] com.aerospike.spark.sql.AerospikeRelation@6252094d
There don’t seem to be an issue in retrieving the data from aerospike. Spark will do the sorting itself in memory. My point was how I can see if these two indexes are used. But again that is a moot point.
Thanks
I am not a spark expert, so can’t comment on your spark execution. However, Aerospike cannot return results from two secondary index queries simultaneously. You can build and keep up to 256 secondary indexes but you can reduce the full record-set to a subset (result-set) by employing only one secondary index at a time. However, you do have versatile predicate filters that you can then apply to the secondary index query generated result-set to further reduce the result-set to matching criteria. Look at the predicate filtering page here.
Let me understand this if I may please
I have two indexes on set trading.MARKETDATAAEROSPIKEBATCH on bins ticker and timeissued
So you can query on one index at a time… For example - give me all the records where ticker = ASPK - then you can add a predicate filter to that query -… from these records, give me those where timeissued is > x … whatever. Having an SI on timeissued does not help you there. You can query your data using one of the indexes plus additional predicate filtering.
Follow on, I have two indexes on bins ticker and timeissued.
From AQL. If I query the set with a bin not indexed, it won’t work
select * from trading.MARKETDATAAEROSPIKEBATCH WHERE price = 22 0 rows in set (0.000 secs)
Error: (201) AEROSPIKE_ERR_INDEX_NOT_FOUND
OK there is no index on price. Let us try search with an indexed bin in this case timeissued
Apologies the note above was cut short!
aql> select * from trading.MARKETDATAAEROSPIKEBATCH WHERE price = 22 0 rows in set (0.000 secs) Error: (201) AEROSPIKE_ERR_INDEX_NOT_FOUND OK there is no index on price. Let us try search with an indexed bin in this case timeissued
aql> select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25” ±--------±-------±----------------------+ | ticker | price | timeissued | ±--------±-------±----------------------+ | “MRW” | 284.51 | “2019-07-02T13:25:25” | | “TSCO” | 465.75 | “2019-07-02T13:25:25” | | “VOD” | 311.11 | “2019-07-02T13:25:25” | | “MSFT” | 25.83 | “2019-07-02T13:25:25” | | “SBRY” | 265.85 | “2019-07-02T13:25:25” | | “BP” | 436.71 | “2019-07-02T13:25:25” | | “IBM” | 113.04 | “2019-07-02T13:25:25” | | “ORCL” | 19.24 | “2019-07-02T13:25:25” | | “SAP” | 63.96 | “2019-07-02T13:25:25” | | “MKS” | 492.1 | “2019-07-02T13:25:25” | ±--------±-------±----------------------+ 10 rows in set (0.001 secs) All good. In returns 10 rows of prices as expected
Now let me narrow down the query and choose predicate ticker = “IBM”
aql> select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25” and ticker = “IBM” ±--------±-------±----------------------+ | ticker | price | timeissued | ±--------±-------±----------------------+ | “MRW” | 284.51 | “2019-07-02T13:25:25” | | “TSCO” | 465.75 | “2019-07-02T13:25:25” | | “VOD” | 311.11 | “2019-07-02T13:25:25” | | “MSFT” | 25.83 | “2019-07-02T13:25:25” | | “SBRY” | 265.85 | “2019-07-02T13:25:25” | | “BP” | 436.71 | “2019-07-02T13:25:25” | | “IBM” | 113.04 | “2019-07-02T13:25:25” | | “ORCL” | 19.24 | “2019-07-02T13:25:25” | | “SAP” | 63.96 | “2019-07-02T13:25:25” | | “MKS” | 492.1 | “2019-07-02T13:25:25” | ±--------±-------±----------------------+ 10 rows in set (0.001 secs) These results are incorrect. It should only return one record (the red one). Regardless of whether the index is used or not the query returns the wrong result set.
Let us try this through Spark SQL and just create a temporary view to run SQL on
spark.conf.set("aerospike.seedhost", dbHost)
spark.conf.set("aerospike.port", dbPort)
spark.conf.set("aerospike.namespace",namespace)
spark.conf.set("aerospike.set", dbSet)
spark.conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
spark.conf.set("aerospike.user", dbConnection)
spark.conf.set("aerospike.password", dbPassword)
// create dataframe on top of Aerospike set val dfBatchRead = sqlContext.read. format(“com.aerospike.spark.sql”). option(“aerospike.batchMax”, 10000). load // Create a SQL view on top of the dataframe dfBatchRead.createOrReplaceTempView(“tmp”)
// Do the predicate selection (transformation) and collection here spark.sql(“”“select ticker, price, timeissued from trading.MARKETDATAAEROSPIKEBATCH WHERE timeissued = “2019-07-02T13:25:25"””").collect.foreach(println)
And this is the output
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@d7b38cf dfBatchRead: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary … 7 more fields] [IBM,113.04,2019-07-02T13:25:25]
With one record returned as expected. The advantage of an API like Spark is that regardless of what the underlying storage is (it could be Aerospark, MongoDB, Hbase etc), the ANSI SQL is applied. It should be noted that we still rely on Aerospark-Spark connector API “com.aerospike.spark.sql” to fetch data from the underlying DB.
HTH
AQL is a utility tool with basic functionality - not the full Aerospike API, nor does it support ANSI-SQL. Its a lightweight - application written on top the C client. It has not been updated to support predicate filtering. About AQL here: https://www.aerospike.com/docs/tools/aql/index.html
You might want to write a small application in Java using the Java client if you want to play with predicate filters. See java client code here: GitHub - aerospike/aerospike-client-java: Aerospike Java Client Library and look at: aerospike-client-java/tree/master/examples/src/com/aerospike/examples/ QueryPredExp.java
Making a key of composite type will distribute data in diff nodes, How to ensure records for a user goes to same node ?
why should it matter?
Performance … Making sure that user data is in a single node will consume less network bandwidth when the client fetches data .
Any why should a user data be distributed ? It should be present in a specific node.
In redis there is a construct of tags that make sure that record is present in same slot , while ensuring the key is of composite type ex: {uidx}:card_type1, {uidx}: card_type2
This ensures that all uidx data is present in same node.
Is there a similar construct in aerospike ?
Aerospike does not offer composite keys based on bin data - which is what you are looking for. See more here: Composite keys for Aerospike - #3 by pgupta
Also, this discussion thread may also help: Using Aerospike to process data where uniqueness is defined by combination of keys - Stack Overflow