Saturday, July 4, 2020

Textual description of firstImageUrl

KafkaAvroSerializer: Efficient way to serialize messages with Avro to a Kafka topic

Introduction




Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.

Apache Kafka allows messages to be stored in a variety of formats in the topic:

- Opaque bytes.
- Avro
- Json
- Protocol Buffers





In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.

Avro Serialization


Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.

Avro schemas are created using JSON format.  The format of the schema document is defined in RFC 4627. An example Avro schema looks as follows:

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "first", "type": "string" },
       { "name": "last", "type": "string" }
     ]
} 
Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.

Next, I download avro-tools v1.10.0 from Maven Repo

We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.

Generate Java POJO


To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:

java -jar avro-tools-1.10.0.jar compile schema test.avsc . -string

You can see the Java pojo generated in the "com/example" subdirectory:

$ ls com/example/

FullName.java

Here is the partial gist of the pojo.


For now, just note that the Java Pojo has the schema embedded into it.

Convert Json to Avro


We will now write a simple json message and convert it to avro.

Since the schema only has two fields, I created a simple message as follows:

$ cat test.json
{ "first": "john", "last": "doe" }

Now, lets use avro-tools to convert this to Avro format

java -jar avro-tools-1.10.0.jar fromjson test.json --schema-file test.avsc > test.avro

Now, lets see how the file has been generated:

$ ls test.avro
test.avro

$ file test.avro
test.avro: Apache Avro version 1
Lets look at the file:

$ od -c test.avro
0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 330 315 053
0000260    +   Z 213   ' 371  \0   S 250 225 202 277 374   \ 002 022  \b
0000300    j   o   h   n 006   d   o   e 330 315 307   +   Z 213   ' 371
0000320   \0   S 250 225 202 277 374   \
0000330 

As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.

To complete our exercise, we can use the avro-tools to convert Avro message back to Json.

$ java -jar avro-tools-1.10.0.jar tojson test.avro --pretty --reader-schema-file test.avsc

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

{

  "first" : "john",

  "last" : "doe"

}

Serializing Multiple Messages with Schema


Let's see what happens when you try to serialize multiple messages with Avro schema.

I created an example file with two records:

$ cat people.json
{ "first": "john", "last": "doe" }
{ "first": "Attila", "last": "The Hun" }

Now, I convert it to Avro

$ java -jar avro-tools-1.10.0.jar fromjson people.json --schema-file test.avsc > people.avro

Lets look at the output avro file:

$ od -c people.avro
0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 212 313   ]
0000260  304   v 037 354 350 314   ~   ڪ  ** 207 365 302   1 004   $  \b
0000300    j   o   h   n 006   d   o   e  \b   j   o   h   n 006   d   o
0000320    e 212 313   ] 304   v 037 354 350 314   ~   ڪ  ** 207 365 061
0000340    1
0000341

Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.

Avro Serialization with Kafka


In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....

  1. On the one hand we want the message to be self descriptive, which means we want the schema with the message.
  2. On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the Schema Registry to register the schemas. The schema registry assigns an ID to each unique Schema specification. 

NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.

When the message is serialized to the topic, the KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema ID from the schema registry. The schema id is obtained when you register a schema with the registry.

The format of the message written by the serializer is as follows:

 Offset    Value Description 
 0 Magic Byte
 1-4  4 bytes of schema-id
 5- Avro encoded record 


To see this in action, try to read a message serialized using KafkaAvroSerializer.  I am reading the message from a topic using the kafka-console-consumer

$ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
0000000    00  00  00  01  e0  48  65  35  39  63  62  64  36  35  2d  36
0000020    38  64  39  2d  34  32  61  34  2d  39  66  35  35  2d  64  61
0000040    38  37  37  63  38  64  39  33  30  34  06  61  65  31  4e  74
0000060    65  73  74  5f  73  63  68  65  6d  61  5f  65  6e  64  70  6f
0000100    69  6e  74  5f  72  65  63  6f  72  64  6e  61  6d  65  73  74
0000120    72  61  74  65  67  79  2e  43  6c  69  65  6e  74  3a  20  73

As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480

$ printf "%d" 0x1e0

480


I can verify that a schema with that ID exists in my schema-registry.

$ curl -s http://localhost:8888/schemas/ids/480 | jq '.schema|fromjson'
{
  "type": "record",
  "name": "FullName",
  "namespace": "com.example",
  "fields": [
    {
      "name": "first",
      "type": "string"
    },
    {
      "name": "last",
      "type": "string"
    }
  ]
}
 
As you can see, this schema is exactly the same as the schema I created.

Conclusion


We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.

If you liked this post, you might also like the following:



1 comment :

  1. This was a very good write-up Feroze. I appreciate you taking the time to do this exercise and share it with us.

    ReplyDelete