Spark Streaming join with Aerospark RDD

I just came across Aerospike Spark Connector “Aerospark” and thought of using it in our current project for real time stream joins with spark rdd. So my question is can I join streaming rdd with from kafka with Aerospike rdd? If so, will aerospike rdd internally works by calling multi get api to get all keys for streaming batch records at once? I want to understand the internals of aerospike rdd while performing join. My requirement is that streaming rdd (from kafka) will have small number of records (e.g 30000) which I want to join with data in Aerospike which may contain millions of records. So while performing going, will Aerospike spark connector only load 30000 keys from Aerospike (via multi Get) that I get from kafka streams or it will first get all records present in Aerospike and then perform join? I think if uses multi get to get only 30k records, then it will make sense for me to use in my current project. Let me know in case any other details are required.

The current Aerospike Spark Connector does not support streaming joins using batch reads, but we are in the process of retooling this connector and this is certainly one of the use cases that we plan to support. We will update this topic when the new version of the connector is available.

A new version of the Spark connector has been released which has the aeroJoin function. Have a look at the updated tutorial documentation. Let us know if you have any issues/questions.

The tutorial link is broken. Does this connector support Spark 2 streaming?

Updated link: https://www.aerospike.com/docs/connectors/enterprise/spark/tutorial.html

Hi,

It throws error with Spark 2.4.3 and Aerospike 4.5.2.1.

I got this error on peopleSSN when following the example

spark.conf.set("aerospike.namespace","test")

case class Person(name: String, age: Long, sex: String, ssn: String)
    val people = Seq(
        Person("jimmy", 20, "M", "555-22-3333"),
        Person("johnny", 32, "M", "555-33-2222"),
        Person("linda", 28, "F", "555-23-2323")).toDS.save("people", "ssn")
// Read from test.people set
   val dfReadPeople  = sqlContext.read.
      format("com.aerospike.spark.sql").
      option("aerospike.namespace", "test").
      option("aerospike.set", "people").
      option("aerospike.batchMax", 10000).
      load
dfReadPeople.select('name,'age,'sex,'ssn).show

case class PersonSSN(ssn: String)
    val peopleSSN = Seq(
          PersonSSN("555-22-3333"),
          PersonSSN("555-33-2222")
        ).toDS

val peopleDS = peopleSSN.aeroJoin[Person]("ssn", "people")

and the run result throws an error as below

defined class Person
people: Unit = ()
dfReadPeople: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
+------+---+---+-----------+
|  name|age|sex|        ssn|
+------+---+---+-----------+
| linda| 28|  F|555-23-2323|
|johnny| 32|  M|555-33-2222|
| jimmy| 20|  M|555-22-3333|
+------+---+---+-----------+

defined class PersonSSN
peopleSSN: org.apache.spark.sql.Dataset[PersonSSN] = [ssn: string]
<console>:49: error: **Cannot prove that Person <:< com.aerospike.spark.AeroKV.**
**       val peopleDS = peopleSSN.aeroJoin[Person]("ssn", "people")**
                                                ^

Cheers,

Mich

Hi Mitch, just getting to this now. It looks like at least now this needs to be changed to:

case class Person(__key: Any, name: String, age: Long, sex: String, ssn: String) extends AeroKV

We will be going through the documentation in these next few days.

Great.

Please let me know when the doc is updated.

Thanks

the example code in the docs is updated and there is an example project that has it working.

ok thanks I will try it