Kafka Outbound Connector - using map data type with Kafka Avro format

Hi. Am trying to setup data in Kafka Outbound connector with Aerospike. When trying to use Kafka Avro format for messages, I noticed the following error:

aerospike-kafka_connector-1  | 2022-12-22 20:27:17.607 GMT INFO  metrics-ticker - requests-total: rate(per second) mean=8.05313731102431, m1=9.48698171570335, m5=2.480116641993411, m15=0.8667674157832074
aerospike-kafka_connector-1  | 2022-12-22 20:27:17.613 GMT INFO  metrics-ticker - requests-total: duration(ms) min=1.441459, max=3101.488585, mean=15.582822432504553, stddev=149.48409869767494, median=4.713083, p75=7.851875, p95=17.496458, p98=28.421125, p99=85.418959, p999=3090.952252
aerospike-kafka_connector-1  | 2022-12-22 20:27:17.624 GMT ERROR metrics-ticker - **java.lang.Exception - Map type not allowed, has to be record type**: count=184

Is is true that map data type is not allowed by the connector? I could not find any supporting docs regarding the same.

Avro schema:

{
    "name": "mydata",
    "type": "record",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "name": "com.aerospike.metadata",
                "type": "record",
                "fields": [
                    {
                        "name": "namespace",
                        "type": "string"
                    },
                    {
                        "name": "set",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    },
                    {
                        "name": "userKey",
                        "type": [
                            "null",
                            "long",
                            "double",
                            "bytes",
                            "string"
                        ],
                        "default": null
                    },
                    {
                        "name": "digest",
                        "type": "bytes"
                    },
                    {
                        "name": "msg",
                        "type": "string"
                    },
                    {
                        "name": "durable",
                        "type": [
                            "null",
                            "boolean"
                        ],
                        "default": null
                    },
                    {
                        "name": "gen",
                        "type": [
                            "null",
                            "int"
                        ],
                        "default": null
                    },
                    {
                        "name": "exp",
                        "type": [
                            "null",
                            "int"
                        ],
                        "default": null
                    },
                    {
                        "name": "lut",
                        "type": [
                            "null",
                            "long"
                        ],
                        "default": null
                    }
                ]
            }
        },
        {
            "name": "test",
            "type": "string"
        },
        {
            "name": "testmap",
            "type": {
                "type": "map",
                "values": "string",
                "default": {}
            }
        }
    ]
}

I am not an expert on the Kafka Connector, but it seems in “testmap” field the type cannot be “map”. Since this is an “enterprise” feature that comes with support, I would suggest opening a support ticket to obtain a solution (eg, the correct syntax).

Looks like the connector doesn’t support map when using KafkaAvro format.

Debug logs:

aerospike-kafka_connector-1  | 2023-01-06 18:41:31.098 GMT ERROR ErrorRegistry - Error stack trace
aerospike-kafka_connector-1  | java.lang.Exception: Map type not allowed, has to be record type
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.parser.KafkaAvroOutboundRecordGenerator$Companion.assertOnlyValidTypes(KafkaAvroOutboundRecordGenerator.kt:155)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.parser.KafkaAvroOutboundRecordGenerator$Companion.assertOnlyValidTypes(KafkaAvroOutboundRecordGenerator.kt:164)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.parser.KafkaAvroOutboundRecordGenerator$Companion.assertSchemaValid(KafkaAvroOutboundRecordGenerator.kt:101)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.parser.KafkaAvroOutboundRecordGenerator.<init>(KafkaAvroOutboundRecordGenerator.kt:180)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.inject.KafkaOutboundGuiceModule.getKafkaAvroStreamingRecordParser(KafkaOutboundGuiceModule.kt:59)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.inject.KafkaOutboundGuiceModule.access$getKafkaAvroStreamingRecordParser(KafkaOutboundGuiceModule.kt:29)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.kafka.outbound.inject.KafkaOutboundGuiceModule$bindKafkaAvroParserFactory$1.get(KafkaOutboundGuiceModule.kt:48)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.converter.XdrExchangeConverter.getInbuiltRecordFormatter(XdrExchangeConverter.kt:422)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.converter.XdrExchangeConverter.access$getInbuiltRecordFormatter(XdrExchangeConverter.kt:75)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.converter.XdrExchangeConverter$RouterAndInbuiltFormatter.<init>(XdrExchangeConverter.kt:285)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.converter.XdrExchangeConverter.processXdrRecord(XdrExchangeConverter.kt:192)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.converter.XdrExchangeConverter.parse(XdrExchangeConverter.kt:134)
aerospike-kafka_connector-1  | 	at com.aerospike.connect.outbound.OutboundBridge$processAsync$1.invokeSuspend(OutboundBridge.kt:182)
aerospike-kafka_connector-1  | 	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
aerospike-kafka_connector-1  | 	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
aerospike-kafka_connector-1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
aerospike-kafka_connector-1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
aerospike-kafka_connector-1  | 	at java.base/java.lang.Thread.run(Thread.java:829)

This should have been mentioned in the docs.