Wednesday, April 17, 2024

Textual description of firstImageUrl

Troubleshooting serialization errors when using Confluent KafkaAvroSerializer

Introduction

Confluent provides on-prem and hosted services of the Apache Kafka ecosystem. As part of this they also provide a custom serializer/deserializer to enable writing and reading messages with schema from a topic.

The KafkaAvroSerializer provides an efficient mechanism for serializing messages to a kafka topic.

The wire format of the KafkaAvroSerializer is described here


Sometimes, the producer of the message is not using these libraries, but still writing messages that adhere to the confluent wire format. The reason for doing this is to make the messages compatible with Confluent so that you can use the KafkaAvroDeserializer. For eg, you might want to write those messages to S3 using the kafka-connect-s3 sink connector.

When the producer is doing custom serialization, but makes an error in encoding, it can result in serialization errors down the line. These kind of errors are difficult to debug. In this blog post, I will describe how you can root cause those errors.

Serialization Errors

Here is an example of an error we noticed in the kafka connect s3 sink connector in our production instance.


Troubleshooting


Start off by downloading the schema that the error message is referring to:


Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 15930

I download the schema for that schemaid as follows:


curl -s https://schema-registry.mydomain.local/schemas/ids/15930 | jq '.schema|fromjson' > schema.avsc

Next, you need to download the failed message from the topic. The topicname/partition/offset are given in the exception message.


  Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', 
  where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}.


If you dont see the partition/offset, you can enable your sink connector to log that info with the following configs:

errors.log.enabled: true
errors.log.include.messages: true


Now that we have the topic/partition/offset, lets download the offending message from the kafka topic

$ kafka-console-consumer \
--bootstrap-server kafka-cluster.mydomain.local:6030 \
--command-config security.properties \
--topic crm-tag-update-event-prod-topic \
--partition 0 \
--offset 52510041 \
> message_raw.bin


So we have the schema and the offending message. How do we decode them? For that we use avro-tools. You can download avro-tools from the Sonatype Index 

 For this demo, I am using 1.11.1 version. 

 But wait.. the message that we downloaded from the topic is the raw message. This message is encoded with a magic byte and schema-id in the first 5 bytes ( refer to the confluent wire format) So we first need to extract the actual message. 

This should be easy, since we need to remove the 5 byte header in the message. 

$ dd if=message_raw.bin of=message.bin bs=1 skip=5


Now that we have the actual message and the schema, we can use jsontofrag to decode the message. We expect this to fail, since the message schema-id is not matching the message schema. 

$ java -jar /tmp/avro-tools-1.11.1.jar fragtojson --schema-file schema.avsc message.bin


Root Cause 

To get to the root cause, we need to look at the producer.


One of the following is the cause of this:

1) The schema-id is correct for that topic, but the serialized message does not match.

2) The schema-id is wrong, but the serialized message is correct.


To find out which one of these it is, we need to understand the producer.

In our case, what happened was that an error in a Gitlab pipeline caused the wrong job to be deployed to our environment. That job was using the correct schema subject, but was compiled against POJOs that did not match the subject. So it serialized messages using the wrong pojo but the correct schema-id.

For #1, you need to find the actual schema that was used in serialization. Then download that schema and use it to decode the message using avro-tools.

#2 is more difficult since we dont know the correct schema-id. This will require looking at the producer code to figure out what is the correct schema subject/version or schema-id it should have used for serialization.

Conclusion

In this blog post I showed you how to :

1) Read avro messages from a kafka topic.
2) Extract the embedded message from the raw message.
3) Decode it using avro-tools.
4) Troubleshoot serialization errors.

If you liked this blog post, you might also enjoy the following: