Using aerospike-kafka connector in a distributed environment

Assuming that you have a Kafka cluster with 9 brokers and replication factor of 9 for failover:

**${KAFKA_HOME}/bin/kafka-topics.sh --describe --zookeeper rhes75:2181, rhes564:2181, rhes76:2181 --topic md**
Topic:md        PartitionCount:9        ReplicationFactor:9     Configs:
        Topic: md       Partition: 0    Leader: 8       Replicas: 8,9,1,2,3,4,5,6,7     Isr: 8,9,1,2,3,4,5,6,7
        Topic: md       Partition: 1    Leader: 9       Replicas: 9,1,2,3,4,5,6,7,8     Isr: 9,1,2,3,4,5,6,7,8
        Topic: md       Partition: 2    Leader: 1       Replicas: 1,2,3,4,5,6,7,8,9     Isr: 1,2,3,4,5,6,7,8,9
        Topic: md       Partition: 3    Leader: 2       Replicas: 2,3,4,5,6,7,8,9,1     Isr: 2,3,4,5,6,7,8,9,1
        Topic: md       Partition: 4    Leader: 3       Replicas: 3,4,5,6,7,8,9,1,2     Isr: 3,4,5,6,7,8,9,1,2
        Topic: md       Partition: 5    Leader: 4       Replicas: 4,5,6,7,8,9,1,2,3     Isr: 4,5,6,7,8,9,1,2,3
        Topic: md       Partition: 6    Leader: 5       Replicas: 5,6,7,8,9,1,2,3,4     Isr: 5,6,7,8,9,1,2,3,4
        Topic: md       Partition: 7    Leader: 6       Replicas: 6,7,8,9,1,2,3,4,5     Isr: 6,7,8,9,1,2,3,4,5
        Topic: md       Partition: 8    Leader: 7       Replicas: 7,8,9,1,2,3,4,5,6     Isr: 7,8,9,1,2,3,4,5,6

This is shown in the attached diagram

Now I have two instances of aerospike Enterprise Edition (AEE) that are used to create a Cluster join in mesh mode. Each AEE runs on its own host and each one has aerospike-Kafka connect installed as well.

The doc says that in order to run aerospike-Kafka connect in standalone mode, you can start a process as follows:

$KAFKA_HOME/bin/connect-standalone.sh \ 
                    /opt/aerospike-kafka-connect-sink/etc/connect-standalone.properties  \ 
                   /opt/aerospike-kafka-connect-sink/etc/aerospike-sink.properties

So basically you need two property files, the first one is for Kafka as source and the second one is for aerospike as sync.

connect-standalone.properties is as follows:

bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false

internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/opt/aerospike-kafka-connect-sink/share/kafka

The second file is as follows:

name=aerospike-sink
connector.class=com.aerospike.kafka.connect.sink.AerospikeSinkConnector
tasks.max=25
topics=md
cluster.hosts=rhes75:3000,rhes76:3000
policy.record_exists_action=replace
topic.namespace=trading
topic.set=MARKETDATAAEROSPIKEBATCH
feature_key.path=/etc/aerospike/features.conf
topic.key_field=rowkey
topic.bins=rowkey,ticker,timeissued,price
aerospike.username=trading_user_RW
aerospike.password=xxxxxxxxx
policy.max_connections_per_node=1200
policy.max_commands_in_queue=500

Note the last two parameters that are recent additions to aerospark-kafka connector to get rid of the annoying 25 connecton limits.

So in summary for each instance of aerospike I run the script. In other words, each subscriber gets records with the same primary key as produced through Kafka. There will not be duplication as aerospike-sink.properties file contains the following:

policy.record_exists_action=replace

which effectively will reject any duplicates. However, it will ensure that in the event of an aerospike-Kafka connect not available, there will not be any single point of failure and aerospike will carry on receiving the records from Kafka.

The log file shows:

[2019-10-15 20:02:24,899] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:bc2542cc-90b9-40f4-b231-a40b030a0e67:27b516a6b316267a956ad6b8eb8ea8d739aa07b6 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,900] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:dc7cf364-74db-4527-a603-2a6a3dc244ee:87ec2422378895e9da5fa013b0f105af2d9c4a31 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,900] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:e3990da9-a0cc-44b7-99f6-a8838f86cf1f:b74b188e6fd88b57c4684f5f016e7127bde73eba (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,901] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:68f5f2af-65ab-4a77-b209-c2f80953a080:a6938092fd1f5419ebd12a184bc24afd9818bdd4 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,901] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:09191b49-9d61-4cc0-989f-23973e177611:b59cec1b2cd443c5783ca800a47618cdbca7e47a (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,902] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:c2638319-57c8-41ee-be69-c7046642193d:8e7b0cdea1434a30e610fab2ab2e0b530eb7fe21 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:24,903] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:c6f246f4-99aa-4935-a2f8-692dc69cb554:c6896601e76d6825d243a25900605e564c80df2c (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,408] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:4c0df0a2-8080-446d-be07-93c1c8b7d6f5:c7bb19ceb891939ae4f8a8873486720462d943d0 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,408] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:237088cc-8219-4e13-b2c3-9bf8fa280cc4:f7c828fd1d758d7bbe0297fde89dd00656f6a655 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,408] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:4dab0f85-6e48-42b2-99fb-bd5cc28a9d1a:260de00a8e22ab651c9099375bcd2de7014d67e1 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,409] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:ed19ff83-6f68-4b68-bcb5-989c2ec39eea:88025c639921ea9b050f437137a003b83959e51d (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,409] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:61f4733f-fb42-4498-8369-887ee6f817c6:e1af0d114c12595cc9123e03cad936d6e2a308a9 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,410] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:2e94fd9b-56d5-485a-8d3e-f96a87216897:cff04085fa2983a31eb85fd3ac08df387167247c (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,410] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:804271ef-8ca3-413c-a1f6-eac257a88a5b:7c74e461031c03d1a0aa7f69a5a36c61d56f1952 (com.aerospike.kafka.connect.sink.AsyncWriter:85)
[2019-10-15 20:02:28,411] INFO Writing record asynchronously: trading:MARKETDATAAEROSPIKEBATCH:5019ddba-2a0e-4d5b-97c3-fef923646d85:21d873d65d1cb13f77c23500e08243425cbe95a5 (com.aerospike.kafka.connect.sink.AsyncWriter:85)

I trust this makes sense. Otherwise I will be happy to hear alternatives.