Introduction
Confluent provides on-prem and hosted services of the Apache Kafka ecosystem. As part of this they also provide a custom serializer/deserializer to enable writing and reading messages with schema from a topic.
The KafkaAvroSerializer provides an efficient mechanism for serializing messages to a kafka topic.
The wire format of the KafkaAvroSerializer is described here
Sometimes, the producer of the message is not using these libraries, but still writing messages that adhere to the confluent wire format. The reason for doing this is to make the messages compatible with Confluent so that you can use the KafkaAvroDeserializer. For eg, you might want to write those messages to S3 using the kafka-connect-s3 sink connector.
When the producer is doing custom serialization, but makes an error in encoding, it can result in serialization errors down the line. These kind of errors are difficult to debug. In this blog post, I will describe how you can root cause those errors.
Serialization Errors
Here is an example of an error we noticed in the kafka connect s3 sink connector in our production instance.
Troubleshooting
Start off by downloading the schema that the error message is referring to:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 15930 |
I download the schema for that schemaid as follows:
curl -s https://schema-registry.mydomain.local/schemas/ids/15930 | jq '.schema|fromjson' > schema.avsc |
Next, you need to download the failed message from the topic. The topicname/partition/offset are given in the exception message.
Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}. |
If you dont see the partition/offset, you can enable your sink connector to log that info with the following configs:
errors.log.enabled: true errors.log.include.messages: true |
Now that we have the topic/partition/offset, lets download the offending message from the kafka topic
$ kafka-console-consumer \ --bootstrap-server kafka-cluster.mydomain.local:6030 \ --command-config security.properties \ --topic crm-tag-update-event-prod-topic \ --partition 0 \ --offset 52510041 \ > message_raw.bin |
So we have the schema and the offending message. How do we decode them? For that we use avro-tools. You can download avro-tools from the Sonatype Index
$ dd if=message_raw.bin of=message.bin bs=1 skip=5 |
Now that we have the actual message and the schema, we can use jsontofrag to decode the message. We expect this to fail, since the message schema-id is not matching the message schema.
$ java -jar /tmp/avro-tools-1.11.1.jar fragtojson --schema-file schema.avsc message.bin |
Root Cause
To get to the root cause, we need to look at the producer.
1) The schema-id is correct for that topic, but the serialized message does not match.
2) The schema-id is wrong, but the serialized message is correct.
No comments :
Post a Comment