Introduction
Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.
Apache Kafka allows messages to be stored in a variety of formats in the topic:
- Opaque bytes.
- Avro
- Json
- Protocol Buffers
In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.
Avro Serialization
Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.
Avro schemas are created using JSON format. The format of the schema document is defined in
RFC 4627. An example Avro schema looks as follows:
{
"type": "record",
"namespace": "com.example",
"name": "FullName",
"fields": [
{ "name": "first", "type": "string" },
{ "name": "last", "type": "string" }
]
}
Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.
Next, I download avro-tools v1.10.0 from
Maven Repo.
We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.
Generate Java POJO
To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:
java -jar avro-tools-1.10.0.jar compile schema test.avsc . -string
You can see the Java pojo generated in the "com/example" subdirectory:
$ ls com/example/
FullName.java
Here is the partial gist of the pojo.
For now, just note that the Java Pojo has the schema embedded into it.
Convert Json to Avro
We will now write a simple json message and convert it to avro.
Since the schema only has two fields, I created a simple message as follows:
$ cat test.json
{ "first": "john", "last": "doe" }
Now, lets use avro-tools to convert this to Avro format
java -jar avro-tools-1.10.0.jar fromjson test.json --schema-file test.avsc > test.avro
Now, lets see how the file has been generated:
$ ls test.avro
test.avro
$ file test.avro
test.avro: Apache Avro version 1
Lets look at the file:
$ od -c test.avro
0000000 O b j 001 004 026 a v r o . s c h e m
0000020 a 222 002 { " t y p e " : " r e c o
0000040 r d " , " n a m e " : " F u l l
0000060 N a m e " , " n a m e s p a c e
0000100 " : " c o m . e x a m p l e " ,
0000120 " f i e l d s " : [ { " n a m e
0000140 " : " f i r s t " , " t y p e "
0000160 : " s t r i n g " } , { " n a m
0000200 e " : " l a s t " , " t y p e "
0000220 : " s t r i n g " } ] } 024 a v r
0000240 o . c o d e c \b n u l l \0 330 315 053
0000260 + Z 213 ' 371 \0 S 250 225 202 277 374 \ 002 022 \b
0000300 j o h n 006 d o e 330 315 307 + Z 213 ' 371
0000320 \0 S 250 225 202 277 374 \
0000330
As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.
To complete our exercise, we can use the avro-tools to convert Avro message back to Json.
$ java -jar avro-tools-1.10.0.jar tojson test.avro --pretty --reader-schema-file test.avsc
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
{
"first" : "john",
"last" : "doe"
}
Serializing Multiple Messages with Schema
Let's see what happens when you try to serialize multiple messages with Avro schema.
I created an example file with two records:
$ cat people.json
{ "first": "john", "last": "doe" }
{ "first": "Attila", "last": "The Hun" }
Now, I convert it to Avro
$ java -jar avro-tools-1.10.0.jar fromjson people.json --schema-file test.avsc > people.avro
Lets look at the output avro file:
$ od -c people.avro
0000000 O b j 001 004 026 a v r o . s c h e m
0000020 a 222 002 { " t y p e " : " r e c o
0000040 r d " , " n a m e " : " F u l l
0000060 N a m e " , " n a m e s p a c e
0000100 " : " c o m . e x a m p l e " ,
0000120 " f i e l d s " : [ { " n a m e
0000140 " : " f i r s t " , " t y p e "
0000160 : " s t r i n g " } , { " n a m
0000200 e " : " l a s t " , " t y p e "
0000220 : " s t r i n g " } ] } 024 a v r
0000240 o . c o d e c \b n u l l \0 212 313 ]
0000260 304 v 037 354 350 314 ~ ڪ ** 207 365 302 1 004 $ \b
0000300 j o h n 006 d o e \b j o h n 006 d o
0000320 e 212 313 ] 304 v 037 354 350 314 ~ ڪ ** 207 365 061
0000340 1
0000341
Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.
Avro Serialization with Kafka
In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....
- On the one hand we want the message to be self descriptive, which means we want the schema with the message.
- On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the
Schema Registry to register the schemas. The schema registry assigns an
ID to each unique Schema specification.
NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.
When the message is serialized to the topic, the
KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema
ID from the schema registry. The schema id is obtained when you register a schema with the registry.
The format of the message written by the serializer is as follows:
Offset | Value | Description |
0 | 0 | Magic Byte |
1-4 | | 4 bytes of schema-id |
5- | | Avro encoded record |
To see this in action, try to read a message serialized using KafkaAvroSerializer. I am reading the message from a topic using the kafka-console-consumer
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
0000000 00 00 00 01 e0 48 65 35 39 63 62 64 36 35 2d 36
0000020 38 64 39 2d 34 32 61 34 2d 39 66 35 35 2d 64 61
0000040 38 37 37 63 38 64 39 33 30 34 06 61 65 31 4e 74
0000060 65 73 74 5f 73 63 68 65 6d 61 5f 65 6e 64 70 6f
0000100 69 6e 74 5f 72 65 63 6f 72 64 6e 61 6d 65 73 74
0000120 72 61 74 65 67 79 2e 43 6c 69 65 6e 74 3a 20 73
As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480
$ printf "%d" 0x1e0
480
I can verify that a schema with that ID exists in my schema-registry.
$ curl -s http://localhost:8888/schemas/ids/480 | jq '.schema|fromjson'
{
"type": "record",
"name": "FullName",
"namespace": "com.example",
"fields": [
{
"name": "first",
"type": "string"
},
{
"name": "last",
"type": "string"
}
]
}
As you can see, this schema is exactly the same as the schema I created.
Conclusion
We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.
If you liked this post, you might also like the following: