Writing to Aerospike from Spark with bulk write with user authentication in Scala fails



Aerospike Enterprise version

As far as I know one can create a client as follows that works

import com.aerospike.spark.sql._
    import com.aerospike.client.AerospikeClient
    import com.aerospike.client.Bin
    import com.aerospike.client.Key
    import com.aerospike.client.Value
    import com.aerospike.client.AerospikeClient
    import com.aerospike.client.Host
    import com.aerospike.client.policy.ClientPolicy

    var hosts = {
        new Host("rhes75", 3000)

    val policy = new ClientPolicy()
    policy.user = dbConnection
    policy.password = dbPassword
    val client = new AerospikeClient(policy, hosts)
    val TEST_COUNT = 100
    for (i <- 1 to TEST_COUNT) {
       val key = new Key(namespace, "spark-test", "spark-test-"+i)
       client.put(null, key,
       new Bin("one", i),
       new Bin("two", "two:"+i),
       new Bin("three", i.toDouble)

However, I have no way passing user credential when I am doing bulk write using a dataframe

          option("aerospike.namespace", namespace).
          option("aerospike.set", dbSet).
          option("aerospike.updateByKey", "id").
          option("aerospike.keyColumn", "__id").
          option("aerospike.batchMax", 5000).
          option("aerospike.keyPath", "/etc/aerospike/features.conf").

when I turn off security in conf file this works fine. Otherwise I get the following error:

Caused by: com.aerospike.client.AerospikeException$Connection: Error -8: Failed to connect to host(s): rhes75 3000 Error 80: not authenticated

Any ideas how this can be done with user authentication




Just as an addon I see this in aerospike log

Apr 21 2019 17:52:24 GMT: INFO (security): (security.c:5483) permitted | client: | authenticated user: mich | action: login | detail: user=mich
**Apr 21 2019 17:52:25 GMT: INFO (security): (security.c:5483) not authenticated | client: | authenticated user: <none> | action: info request | detail: <none>**

So the first one with user=mich is authenticated.

however, the second one looking for bulk-write is missing the authenticated user. So there must be a way of passing the username?



Thanks for highlighting. Yes, you’re right - an oversight now addressed for future users - for reference

spark.conf.set(“aerospike.user”, dbUser) spark.conf.set(“aerospike.password”, dbPassword)


Hi Ken,

Many thanks. I have tested the interim solution of the JAR file you provided. It works fine for the batch load of data from Spark into aerospike when the security is turned on and the connector uses the username and password as additional parameters to spark.conf

     val sqlContext = spark.sqlContext

     spark.conf.set("aerospike.seedhost", dbHost)
     spark.conf.set("aerospike.port", dbPort)
     spark.conf.set("aerospike.set", dbSet)
     spark.conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
     **spark.conf.set("aerospike.user", dbConnection)**
**     spark.conf.set("aerospike.password", dbPassword)**