Migrating a mongoDB collection documents to Aerospike Set using Spark as ETL tool

Example of migrating a mongoDB collection documents to Aerospike Set using Spark as ETL tool You will need the jar files as follows (at time of writing)

aerospike-spark-assembly-1.1.2.jar
mongo-spark-connector_2.11-2.3.0.jar
mongo-java-driver-3.8.1.jar 
In Addition, you will need the following mongoDB packages
org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

Spark’s shell provides a simple tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following:

$SPARK_HOME/bin/spark-shell

However, if you choose this way you will need tp pass the jar file and packages. Example as below

The full Scala code is in my GitHub

$SPARK_HOME/bin/spark-shell --jars /home/hduser/jars/aerospike-spark-assembly-1.1.2.jar,/home/hduser/jars/mongo-spark-connector_2.11-2.3.0.jar,/home/hduser/jars/mongo-java-driver-3.8.1.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

Also as there is a bug in Mongo, if you choose this approach, you will need to pass MongoDB credentials to spark-shell as well!

$SPARK_HOME/bin/spark-shell --jars /home/hduser/jars/aerospike-spark-assembly-1.1.2.jar,/home/hduser/jars/mongo-spark-connector_2.11-2.3.0.jar,/home/hduser/jars/mongo-java-driver-3.8.1.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 --conf "spark.mongodb.input.uri=mongodb://<DB_USER>:<DB_PASSWORD>@<HOST_NAME>:<MONGO_PORT>/<DB_NAME>.<COLLECTION_NAME>"

If you use SBT to create a Uber or Fat Jar file, you will not need this. Note that example here uses Aerospike with user authentication. Both the Aerospike-Spark connector and user authentication are part of Aerospike Enterprise edition and are licensed product.

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf

import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
sc.setLogLevel("ERROR")

import com.mongodb.spark._
//
//aerospike stuff
//
import com.aerospike.spark.sql._
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Bin
import com.aerospike.client.Key
import com.aerospike.client.Value
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Host
import com.aerospike.client.policy.ClientPolicy

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

case class operationStruct (op_type: Int, op_time: String)
case class tradeStruct (tickerType: String, tickerClass: String, tickerStatus: String, tickerQuotes: Array[Double])
case class priceStruct(key: String, ticker: String, timeissued: String, price: Double, currency: String)
case class priceDocument(priceInfo: priceStruct, operation: operationStruct)

var sparkAppName = "mongoAerospike"
var mongodbHost = "rhes75"
var mongodbPort = "60100"
var zookeeperHost = "rhes75"
var zooKeeperClientPort = "2181"
var dbConnection = "mongodb"
var dbDatabase = "trading"
var dbUsername = "trading_user_RW"
var dbPassword = "xxxx"
val collectionName = "MARKETDATAMONGODBSPEED"
val connectionString = dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Create a SparkSession. No need to create SparkContext. In Spark 2.0 the same effects can be achieved through SparkSession, without explicitly creating SparkConf, SparkContext or SQLContext as they are encapsulated within the SparkSession
val spark =  SparkSession.
             builder().
             appName(sparkAppName).
             config("spark.driver.allowMultipleContexts", "true").
             config("spark.hadoop.validateOutputSpecs", "false").
             getOrCreate()

spark.conf.set("spark.mongodb.input.uri", connectionString)
spark.conf.set("spark.mongodb.output.uri", connectionString)


val sqlContext = spark.sqlContext

println ("\nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
val rddMongoDB = MongoSpark.load(sc)
val dfrddMongoDB = rddMongoDB.toDF
dfrddMongoDB.printSchema
/*
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- operation: struct (nullable = true)
 |    |-- op_type: integer (nullable = true)
 |    |-- op_time: string (nullable = true)
 |-- priceInfo: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- ticker: string (nullable = true)
 |    |-- timeissued: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- currency: string (nullable = true)

// one example of mongo document from mongo collection
{
    "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
    "priceInfo" : {
        "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
        "ticker" : "ORCL",
        "timeissued" : "2019-04-10T21:20:57",
        "price" : 41.13,
        "currency" : "GBP"
    },
    "operation" : {
        "op_type" : NumberInt(1),
        "op_time" : "1554927506012"
    }
}
*/
// Flatten the structs
val df = dfrddMongoDB.
               select(
                        'priceInfo.getItem("key").as("key")
                      , 'priceInfo.getItem("ticker").as("ticker")
                      , 'priceInfo.getItem("timeissued").as("timeissued")
                      , 'priceInfo.getItem("price").as("price")
                      , 'priceInfo.getItem("currency").as("currency")
                      , 'operation.getItem("op_type").as("op_type")
                      , 'operation.getItem("op_time").as("op_time")
                     )
//df.show(5)
var hosts = {
    new Host("rhes75", 3000)
}

var dbHost = "rhes75"
var dbPort = "3000"
var dbConnection = "test_RW"
var namespace = "test"
var dbPassword = "xxxx"
var dbSet = "mongoDBtoaerospike"

     spark.conf.set("aerospike.seedhost", dbHost)
     spark.conf.set("aerospike.port", dbPort)
     spark.conf.set("aerospike.namespace",namespace)
     spark.conf.set("aerospike.set", dbSet)
     spark.conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
     spark.conf.set("aerospike.user", dbConnection)
     spark.conf.set("aerospike.password", dbPassword)

// Write 100 records only to Aerospike set
  df.limit(100).write.
      mode(SaveMode.Overwrite).
      format("com.aerospike.spark.sql").
      option("aerospike.updateByKey", "key").
      option("aerospike.keyColumn", "__key").
      save()

   val dfRead  = sqlContext.read.
      format("com.aerospike.spark.sql").
      option("aerospike.batchMax", 10000).
      load
// To prove read 5 records from the set it was written to

dfRead.select('key,'ticker,'timeissued,'price,'currency,'op_type,'op_time).take(5).foreach(println)
//
println ("\nFinished at"); spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
sys.exit()

And the output is as follows:

UI is on 55555
Ivy Default Cache set to: /home/hduser/.ivy2/cache
The jars for the packages stored in: /home/hduser/.ivy2/jars
:: loading settings :: url = jar:file:/d4T/hduser/spark-2.3.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found org.mongodb.spark#mongo-spark-connector_2.11;2.3.0 in central
        found org.mongodb#mongo-java-driver;3.8.0 in central
:: resolution report :: resolve 125ms :: artifacts dl 3ms
        :: modules in use:
        org.mongodb#mongo-java-driver;3.8.0 from central in [default]
        org.mongodb.spark#mongo-spark-connector_2.11;2.3.0 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/3ms)
19/07/15 18:36:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/07/15 18:36:22 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
Spark context Web UI available at http://rhes75:55555
Spark context available as 'sc' (master = yarn, app id = application_1563096560657_0039).
Spark session available as 'spark'.
Loading import_from_MongoDB_into_aerospike.scala...
import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import com.mongodb.spark._
import com.aerospike.spark.sql._
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Bin
import com.aerospike.client.Key
import com.aerospike.client.Value
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Host
import com.aerospike.client.policy.ClientPolicy
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
defined class operationStruct
defined class tradeStruct
defined class priceStruct
defined class priceDocument
sparkAppName: String = mongoAerospike
mongodbHost: String = rhes75
mongodbPort: String = 60100
zookeeperHost: String = rhes75
zooKeeperClientPort: String = 2181
dbConnection: String = mongodb
dbDatabase: String = trading
dbUsername: String = trading_user_RW
dbPassword: String = mongodb
collectionName: String = MARKETDATAMONGODBSPEED
connectionString: String = mongodb://trading_user_RW:mongodb@rhes75:60100/trading.MARKETDATAMONGODBSPEED
warning: there was one deprecation warning; re-run with -deprecation for details
HiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@ab5e63
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4ab8c3c0
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@370e5bb9

Started at
[15/07/2019 18:36:31.31]
rddMongoDB: com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[4] at RDD at MongoRDD.scala:53
dfrddMongoDB: org.apache.spark.sql.DataFrame = [_id: struct<oid: string>, operation: struct<op_type: int, op_time: string> ... 1 more field]
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- operation: struct (nullable = true)
 |    |-- op_type: integer (nullable = true)
 |    |-- op_time: string (nullable = true)
 |-- priceInfo: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- ticker: string (nullable = true)
 |    |-- timeissued: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- currency: string (nullable = true)

df: org.apache.spark.sql.DataFrame = [key: string, ticker: string ... 5 more fields]
hosts: com.aerospike.client.Host = rhes75 3000
dbHost: String = rhes75
dbPort: String = 3000
dbConnection: String = mich
namespace: String = test
dbPassword: String = aerospike
dbSet: String = mongoDBtoaerospike
dfRead: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 10 more fields]
[2d6ea161-52b4-4959-ac6a-099246fcb5b0,TSCO,2019-04-10T21:27:03,174.76,GBP,1,1554927872008]
[7b16afbf-9695-4dc0-9eaf-5a613efa589d,MSFT,2019-04-10T21:23:32,49.8,GBP,1,1554927660010]
[f6b0ef96-8ed4-4fa3-b9ae-b368fe98bcb5,SAP,2019-04-10T21:22:06,176.36,GBP,1,1554927576124]
[798a529a-7717-4332-bf87-cb7860b19107,ORCL,2019-04-10T21:22:55,41.64,GBP,1,1554927624008]
[d2c64193-a998-43af-8605-93acfbf91274,TSCO,2019-04-10T21:24:13,193.57,GBP,1,1554927702009]

Finished at
[15/07/2019 18:36:39.39]
© 2015 Copyright Aerospike, Inc. | All rights reserved. Creators of the Aerospike Database.