The Aerospike Knowledge Base has moved to https://support.aerospike.com. Content on https://discuss.aerospike.com is being migrated to either https://support.aerospike.com or https://docs.aerospike.com. Maintenance on articles stored in this repository ceased on December 31st 2022 and this article may be stale. If you have any questions, please do not hesitate to raise a case via https://support.aerospike.com.
Root level value error when using Aerospike Inbound Kafka Connector with librdkafka
Problem Description
When using the Aerospike Inbound Kafka Connector and Kafka Connect along with messages generated using a librdkafka
based C client, an error is returned. The message looks as follows:
4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}
In the example above, binname
is the name of the bin and the following JSON contains the values. 4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4
is intended to be interpreted as the key.
The following error is returned:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('D' (code 68)): Expected space separating root-level values
at [Source: (byte[])"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}"; line: 1, column: 6]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('D' (code 68)): Expected space separating root-level values
at [Source: (byte[])"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4: {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}"; line: 1, column: 6]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
Explanation
The error is caused as librdkafka
and kafka-console-producer.sh
are not sending the key-separator
and thus the key is being parsed as a root level value, which is incorrect. The parameters key.separator
and parse.key
are not supported by librdkafka
.
The following lines show examples of output from two different types of producer, one that passes key.separator=:
with parse.key=true
and one originating from librdkafka
.
OUTPUT:
{"bin1": "value1", "bin2": "value2"} # Output with parsed message key.
testrecordkey:{"bin1": "value1", "bin2": "value2"} # Using librdkafka based C Client.
The connector will fail when sending the data without the key separator since it produces an INVALID JSON object. This is equivalent to running the kafka-console-producer.sh
with --property "parse.key=false"
.
Solution
To fix this, a key delimiter must be used in the librdkafka
based client application. This may involve extra work since properties such as key.separator
and parse.key
are not defined for librdkafka
.
To resolve this when the originating client does not pass properties analogous to key.separator
and parse.key
, parse the message first in the application to separate the key and actual message as two different objects and pass it to rd_kafka_produce()
. The implementation is shown below using kafkacat
which can be used to display messages going into or coming out of Kafka on the command line. As kafkacat
does support key.separator
and parse.key
this can then be sent via rd_kafka_produce()
to the Aerospike Kafka Connector and from there to Aerospike.
$ kafkacat -b localhost:9092 -t test -P -K :
testrecordkey1:{"bin1": "value1", "bin2": "value2"}
testrecordkey2:{"bin1": "value1", "bin2": "value7"}
"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4": {"binname": {"_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4", "id1": 298,"uidtype": 0,"prop":[{"fid": [ ] } ], "xid": 199}}
==
OR
==
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --property "key.separator=:" --property "parse.key=true" --topic test
==========
AQL OUTPUT
==========
aql> select * from test.testset
+----------+----------+----------------------------------------------------------------------------------------------------------------+
| bin1 | bin2 | binname |
+----------+----------+----------------------------------------------------------------------------------------------------------------+
| "value1" | "value7" | |
| "value1" | "value2" | |
| | | MAP('{"xid":199, "prop":[{"fid":[]}], "id1":298, "uidtype":0, "_id":"4475DEA4-CB25-11E9-A32F-2A2AE2DBCCE4"}') |
+----------+----------+----------------------------------------------------------------------------------------------------------------+
Notes
-
The issue described above is not specific to using
librdkafka
. It can be caused with any producer which can send data to Kafka with key included in the message (even withkafka-console-producer.sh
). -
With the Aerospike Inbound Connector for Kafka, the Kafka message should not include the Kafka key and should be separate such as:
Key: testrecordkey, Message: {"bin1": "value1", "bin2": "value2"}
- The
kafka-console-producer.sh
has the propertiesparse.key=true
andkey.separator=:
to achieve this, however, if another client is used, similar properties must exist to avoid this issue.
Keywords
KAFKA INBOUND SERIALIZATIONEXCEPTION JSONPARSEEXCEPTION
Timestamp
November 2019