Root level value error when using Aerospike Inbound Kafka Connector with librdkafka

The Aerospike Knowledge Base has moved to https://support.aerospike.com. Content on https://discuss.aerospike.com is being migrated to either https://support.aerospike.com or https://docs.aerospike.com. Maintenance on articles stored in this repository ceased on December 31st 2022 and this article may be stale. If you have any questions, please do not hesitate to raise a case via https://support.aerospike.com.

Root level value error when using Aerospike Inbound Kafka Connector with librdkafka

Problem Description

When using the Aerospike Inbound Kafka Connector and Kafka Connect along with messages generated using a librdkafka based C client, an error is returned. The message looks as follows:

4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}

In the example above, binname is the name of the bin and the following JSON contains the values. 4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4 is intended to be interpreted as the key.

The following error is returned:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('D' (code 68)): Expected space separating root-level values

at [Source: (byte[])"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}"; line: 1, column: 6]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('D' (code 68)): Expected space separating root-level values

at [Source: (byte[])"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}"; line: 1, column: 6]

        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)

        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)

Explanation

The error is caused as librdkafka and kafka-console-producer.sh are not sending the key-separator and thus the key is being parsed as a root level value, which is incorrect. The parameters key.separator and parse.key are not supported by librdkafka.

The following lines show examples of output from two different types of producer, one that passes key.separator=: with parse.key=true and one originating from librdkafka.

OUTPUT:

{"bin1": "value1", "bin2": "value2"} # Output with parsed message key.
testrecordkey:{"bin1": "value1", "bin2": "value2"} # Using librdkafka based C Client.

The connector will fail when sending the data without the key separator since it produces an INVALID JSON object. This is equivalent to running the kafka-console-producer.sh with --property "parse.key=false".

Solution

To fix this, a key delimiter must be used in the librdkafka based client application. This may involve extra work since properties such as key.separator and parse.key are not defined for librdkafka.

To resolve this when the originating client does not pass properties analogous to key.separator and parse.key, parse the message first in the application to separate the key and actual message as two different objects and pass it to rd_kafka_produce(). The implementation is shown below using kafkacat which can be used to display messages going into or coming out of Kafka on the command line. As kafkacat does support key.separator and parse.key this can then be sent via rd_kafka_produce() to the Aerospike Kafka Connector and from there to Aerospike.

$ kafkacat -b localhost:9092 -t test -P -K :
testrecordkey1:{"bin1": "value1", "bin2": "value2"}
testrecordkey2:{"bin1": "value1", "bin2": "value7"}    
"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4": {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}

==
OR
==

$ bin/kafka-console-producer.sh --broker-list localhost:9092  --property "key.separator=:" --property "parse.key=true" --topic test

==========
AQL OUTPUT
==========

aql> select * from test.testset
+----------+----------+----------------------------------------------------------------------------------------------------------------+
| bin1     | bin2     | binname                                                                                                       |
+----------+----------+----------------------------------------------------------------------------------------------------------------+
| "value1" | "value7" |                                                                                                                |
| "value1" | "value2" |                                                                                                                |
|          |          | MAP('{"xid":199, "prop":[{"fid":[]}], "id1":298, "uidtype":0, "_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4"}') |
+----------+----------+----------------------------------------------------------------------------------------------------------------+

Notes

  • The issue described above is not specific to using librdkafka. It can be caused with any producer which can send data to Kafka with key included in the message (even with kafka-console-producer.sh).

  • With the Aerospike Inbound Connector for Kafka, the Kafka message should not include the Kafka key and should be separate such as:

Key: testrecordkey, Message: {"bin1": "value1", "bin2": "value2"}
  • The kafka-console-producer.sh has the properties parse.key=true and key.separator=: to achieve this, however, if another client is used, similar properties must exist to avoid this issue.

Keywords

KAFKA INBOUND SERIALIZATIONEXCEPTION JSONPARSEEXCEPTION

Timestamp

November 2019