Is there any tool to migrate data from MySQL (or MongoDB) to Aerospike?

From this question on Stackoverflow:

" I would like to migrate data from MySQL (or mongodb) to Aerospike, anyone knows if exists any tool to do that? "

Aerospike provides something like a csv loader.

So you can play around with mysqldump data, process the dumped file to create a .csv as per the accepted format of aerospike-loader, and then load the data into Aerospike.

Those are not industrial strength products. Normally binary type columns cannot be extracted from say Oracle to csv file using select type statement as below:

./flat_csv.sh
usage:          flat un/pw [tables|views]

example:        flat scott/tiger emp dept

description:    Select over standard out all rows of table or view with
                columns delimited by comma.

The alternative is to use an ETL tool like spark that gets data out of Oracle table and puts it in aerospike set. It can use JDBC connection to Oracle instance to read data and aerospike-spark connector to load data into Aerospike. You need the jar file for the database and aerospike-spark connect license. Again this will provide a jar file. Getting data out of MongoDB document into Aerospike is pretty straight forward with Spark. Practically every NoSQL vendor has a Spark connector module.

The code is in my GitHub

import java.sql.DriverManager
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.SQLException
import java.util.ArrayList

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf

import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
sc.setLogLevel("ERROR")

//
//aerospike stuff
//
import com.aerospike.spark.sql._
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Bin
import com.aerospike.client.Key
import com.aerospike.client.Value
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Host
import com.aerospike.client.policy.ClientPolicy

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
def changeToDate (TRANSACTIONDATE : String) : org.apache.spark.sql.Column =  {
    to_date(unix_timestamp($"TRANSACTIONDATE","dd/MM/yyyy").cast("timestamp"))
}

def emptyToNullString(c: Column) = when(length(trim(c)) > 0, c).otherwise("---")

def remove_string: String => String = _.replaceAll("[,_#]", "")
def remove_string_udf = udf(remove_string)
def remove_map: Map[String, Int] => Map[String, Int] = _.map{ case (k, v) => k.replaceAll("[,_#]", "|") -> v }
def remove_map_udf = udf(remove_map)
val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val driverName = "oracle.jdbc.OracleDriver"
var url= "jdbc:oracle:thin:@rhes564:1521:mydb12"
var _username = "scratchpad"
var _password = "xxxxx"
var _dbschema = "SCRATCHPAD"
var _dbtable = "DUMMY"
var e:SQLException = null
var connection:Connection = null
var metadata:DatabaseMetaData = null

// Check Oracle is accessible

try {
      connection = DriverManager.getConnection(url, _username, _password)
} catch {
  case e: SQLException => e.printStackTrace
  connection.close()
}

metadata = connection.getMetaData()
// Check table exists
var rs:ResultSet = metadata.getTables(null,_dbschema,_dbtable, null)

if (rs.next()) {
   println("Table " + _dbschema+"."+_dbtable + " exists")
} else {
   println("Table " + _dbschema+"."+_dbtable + " does not exist, quitting!")
   connection.close()
   sys.exit(1)
}

// Read Oracle table through JDBC

val df = HiveContext.read.format("jdbc").options(
       Map("url" -> url,
       "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy where ROWNUM <= 10000)",
       "user" -> _username,
       "password" -> _password)).load

//
// convert columns to correct types
val df1 = df.
          withColumn("ID", toDouble(df("ID"))).
          withColumn("CLUSTERED", toDouble(df("CLUSTERED"))).
          withColumn("SCATTERED", toDouble(df("SCATTERED"))).
          withColumn("RANDOMISED", toDouble(df("RANDOMISED"))).
          select('ID,'CLUSTERED,'SCATTERED,'RANDOMISED,'RANDOM_STRING,'SMALL_VC,'PADDING).orderBy("ID")
df1.printSchema
df1.take(5).foreach(println)

var dbHost = "rhes75"
var dbPort = "3000"
var dbConnection = "test_rw"
var namespace = "xxxx"
var dbPassword = "aerospike"
var dbSet = "oracletoaerospike"

     val sqlContext = spark.sqlContext

     spark.conf.set("aerospike.seedhost", dbHost)
     spark.conf.set("aerospike.port", dbPort)
     spark.conf.set("aerospike.namespace",namespace)
     spark.conf.set("aerospike.set", dbSet)
     spark.conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
     spark.conf.set("aerospike.user", dbConnection)
     spark.conf.set("aerospike.password", dbPassword)


  df1.write.
      mode(SaveMode.Overwrite).
      format("com.aerospike.spark.sql").
      option("aerospike.updateByKey", "ID").
      option("aerospike.keyColumn", "__ID").
      save()

// read data back into Spark from aerospike set

   val dfRead  = sqlContext.read.
      format("com.aerospike.spark.sql").
      option("aerospike.batchMax", 10000).
      load
dfRead.select('ID,'CLUSTERED,'SCATTERED,'RANDOMISED,'RANDOM_STRING,'SMALL_VC,'PADDING).orderBy("ID").take(5).foreach(println)
//
println ("\nFinished at"); spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
sys.exit()

And this is the output

import java.sql.DriverManager
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.SQLException
import java.util.ArrayList
import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import com.aerospike.spark.sql._
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Bin
import com.aerospike.client.Key
import com.aerospike.client.Value
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Host
import com.aerospike.client.policy.ClientPolicy
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
changeToDate: (TRANSACTIONDATE: String)org.apache.spark.sql.Column
emptyToNullString: (c: org.apache.spark.sql.Column)org.apache.spark.sql.Column
remove_string: String => String
remove_string_udf: org.apache.spark.sql.expressions.UserDefinedFunction
remove_map: Map[String,Int] => Map[String,Int]
remove_map_udf: org.apache.spark.sql.expressions.UserDefinedFunction
toInt: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))
toDouble: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))
toHour: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))
days_since_nearest_holidays: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function3>,IntegerType,Some(List(StringType, StringType, StringType)))
warning: there was one deprecation warning; re-run with -deprecation for details
HiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@54ea832e
driverName: String = oracle.jdbc.OracleDriver
url: String = jdbc:oracle:thin:@rhes564:1521:mydb12
_username: String = scratchpad
_password: String = xxxx
_dbschema: String = SCRATCHPAD
_dbtable: String = DUMMY
e: java.sql.SQLException = null
connection: java.sql.Connection = null
metadata: java.sql.DatabaseMetaData = null
metadata: java.sql.DatabaseMetaData = oracle.jdbc.driver.OracleDatabaseMetaData@195368e2
rs: java.sql.ResultSet = oracle.jdbc.driver.OracleResultSetImpl@1f40fddd
Table SCRATCHPAD.DUMMY exists
df: org.apache.spark.sql.DataFrame = [ID: decimal(38,10), CLUSTERED: decimal(38,10) ... 5 more fields]
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: double, CLUSTERED: double ... 5 more fields]
root
 |-- ID: double (nullable = false)
 |-- CLUSTERED: double (nullable = false)
 |-- SCATTERED: double (nullable = false)
 |-- RANDOMISED: double (nullable = false)
 |-- RANDOM_STRING: string (nullable = true)
 |-- SMALL_VC: string (nullable = true)
 |-- PADDING: string (nullable = true)

[1.0,0.0,0.0,63.0,rMLTDXxxqXOZnqYRJwInlGfGBTxNkAszBGEUGELqTSRnFjRGbi,         1,xxxxxxxxxx]
[2.0,0.0,1.0,926.0,UEDJsfIgoYqwreSuuvjIcPZarpxMdCthpDCsgPlJfvIiylLiBS,         2,xxxxxxxxxx]
[3.0,0.0,2.0,504.0,jrIWMuAWViCOdbspYtDDZrsqaBlItIXvUKQujEYOPRzmGoxGrp,         3,xxxxxxxxxx]
[4.0,0.0,3.0,294.0,HsLQJstfFDYSqtSIMfRDoLDpgfpxSqAQmToFMEwzmcqEIkHzmG,         4,xxxxxxxxxx]
[5.0,0.0,4.0,31.0,vDsFoYAOcitwrWNXCxPHzIIIxwKpTlrsVjFFKUDivytqJqOHGA,         5,xxxxxxxxxx]
hosts: com.aerospike.client.Host = rhes75 3000
dbHost: String = rhes75
dbPort: String = 3000
dbConnection: String = test_RW
namespace: String = test
dbPassword: String = xxxx
dbSet: String = oracletoaerospike
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@933a839
dfRead: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 10 more fields]
[1.0,0.0,0.0,63.0,rMLTDXxxqXOZnqYRJwInlGfGBTxNkAszBGEUGELqTSRnFjRGbi,         1,xxxxxxxxxx]
[2.0,0.0,1.0,926.0,UEDJsfIgoYqwreSuuvjIcPZarpxMdCthpDCsgPlJfvIiylLiBS,         2,xxxxxxxxxx]
[3.0,0.0,2.0,504.0,jrIWMuAWViCOdbspYtDDZrsqaBlItIXvUKQujEYOPRzmGoxGrp,         3,xxxxxxxxxx]
[4.0,0.0,3.0,294.0,HsLQJstfFDYSqtSIMfRDoLDpgfpxSqAQmToFMEwzmcqEIkHzmG,         4,xxxxxxxxxx]
[5.0,0.0,4.0,31.0,vDsFoYAOcitwrWNXCxPHzIIIxwKpTlrsVjFFKUDivytqJqOHGA,         5,xxxxxxxxxx]

Finished at
[11/07/2019 22:42:18.18]

This topic was automatically closed 6 days after the last reply. New replies are no longer allowed.