Saturday, July 4, 2020

Textual description of firstImageUrl

KafkaAvroSerializer: Efficient way to serialize messages with Avro to a Kafka topic


Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.

Apache Kafka allows messages to be stored in a variety of formats in the topic:

- Opaque bytes.
- Avro
- Json
- Protocol Buffers

In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.

Avro Serialization

Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.

Avro schemas are created using JSON format.  The format of the schema document is defined in RFC 4627. An example Avro schema looks as follows:

     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "first", "type": "string" },
       { "name": "last", "type": "string" }
Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.

Next, I download avro-tools v1.10.0 from Maven Repo

We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.

Generate Java POJO

To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:

java -jar avro-tools-1.10.0.jar compile schema test.avsc . -string

You can see the Java pojo generated in the "com/example" subdirectory:

$ ls com/example/

Here is the partial gist of the pojo.

For now, just note that the Java Pojo has the schema embedded into it.

Convert Json to Avro

We will now write a simple json message and convert it to avro.

Since the schema only has two fields, I created a simple message as follows:

$ cat test.json
{ "first": "john", "last": "doe" }

Now, lets use avro-tools to convert this to Avro format

java -jar avro-tools-1.10.0.jar fromjson test.json --schema-file test.avsc > test.avro

Now, lets see how the file has been generated:

$ ls test.avro

$ file test.avro
test.avro: Apache Avro version 1
Lets look at the file:

$ od -c test.avro
0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 330 315 053
0000260    +   Z 213   ' 371  \0   S 250 225 202 277 374   \ 002 022  \b
0000300    j   o   h   n 006   d   o   e 330 315 307   +   Z 213   ' 371
0000320   \0   S 250 225 202 277 374   \

As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.

To complete our exercise, we can use the avro-tools to convert Avro message back to Json.

$ java -jar avro-tools-1.10.0.jar tojson test.avro --pretty --reader-schema-file test.avsc

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See for further details.


  "first" : "john",

  "last" : "doe"


Serializing Multiple Messages with Schema

Let's see what happens when you try to serialize multiple messages with Avro schema.

I created an example file with two records:

$ cat people.json
{ "first": "john", "last": "doe" }
{ "first": "Attila", "last": "The Hun" }

Now, I convert it to Avro

$ java -jar avro-tools-1.10.0.jar fromjson people.json --schema-file test.avsc > people.avro

Lets look at the output avro file:

$ od -c people.avro
0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 212 313   ]
0000260  304   v 037 354 350 314   ~   ڪ  ** 207 365 302   1 004   $  \b
0000300    j   o   h   n 006   d   o   e  \b   j   o   h   n 006   d   o
0000320    e 212 313   ] 304   v 037 354 350 314   ~   ڪ  ** 207 365 061
0000340    1

Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.

Avro Serialization with Kafka

In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....

  1. On the one hand we want the message to be self descriptive, which means we want the schema with the message.
  2. On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the Schema Registry to register the schemas. The schema registry assigns an ID to each unique Schema specification. 

NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.

When the message is serialized to the topic, the KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema ID from the schema registry. The schema id is obtained when you register a schema with the registry.

The format of the message written by the serializer is as follows:

 Offset    Value Description 
 0 Magic Byte
 1-4  4 bytes of schema-id
 5- Avro encoded record 

To see this in action, try to read a message serialized using KafkaAvroSerializer.  I am reading the message from a topic using the kafka-console-consumer

$ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
0000000    00  00  00  01  e0  48  65  35  39  63  62  64  36  35  2d  36
0000020    38  64  39  2d  34  32  61  34  2d  39  66  35  35  2d  64  61
0000040    38  37  37  63  38  64  39  33  30  34  06  61  65  31  4e  74
0000060    65  73  74  5f  73  63  68  65  6d  61  5f  65  6e  64  70  6f
0000100    69  6e  74  5f  72  65  63  6f  72  64  6e  61  6d  65  73  74
0000120    72  61  74  65  67  79  2e  43  6c  69  65  6e  74  3a  20  73

As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480

$ printf "%d" 0x1e0


I can verify that a schema with that ID exists in my schema-registry.

$ curl -s http://localhost:8888/schemas/ids/480 | jq '.schema|fromjson'
  "type": "record",
  "name": "FullName",
  "namespace": "com.example",
  "fields": [
      "name": "first",
      "type": "string"
      "name": "last",
      "type": "string"
As you can see, this schema is exactly the same as the schema I created.


We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.

Tuesday, October 4, 2016

Implementing a Ping client in Python - Part III


This is Part-3 of the N Part Series where we develop a Ping client in Python.

Until now, we understood the structure of the ICMP Packet, and also looked at python code to serialize and deserialize the packet from the network.

In the current installment, we will complete the program. First, let us implement the checksum.

ICMP Header Checksum

A checksum is a 16bit value used for error checking the header and the data. A checksum allows the receiver to make sure that the data it received did not get corrupted or altered in transit. The transmitter adds a checksum to the outgoing packet, this checksum is calculated by looking at the packet. The receiver calculates a checksum on the received packet, and compares it with the checksum in the packet. If they match, then there was no corruption on the wire.

As per Wikipedia:

The checksum field is the 16 bit one's complement of the one's complement sum of all 16-bit words in the header and text. If a segment contains an odd number of header and text octets to be checksummed, the last octet is padded on the right with zeros to form a 16-bit word for checksum purposes. The pad is not transmitted as part of the segment. While computing the checksum, the checksum field itself is replaced with zeros.

Using this definition, we have the following checksum function

Socket Introduction

The Wikipedia article on Sockets gives some background info on this topic. For our case, since we are going to be dealing with a lower level protocol like ICMP, we will be creating a Raw socket.

In order to create a RAW socket, you need to be a superuser. On UNIX machines, you will need to use "sudo" to run the program. On windows, you need to be a member of the Administrator group to run the program

The python documentation for Socket is a great resource as well.

A raw socket is created as follows:

sock.setsockopt(IPPROTO_IP, IP_HDRINCL, 0)
sock.bind(('', 0))

Since we are only sending the ICMP packet, and not it's encapsulating IP packet, we will need to indicate to the network driver using setsockopt(IPPROTO_IP, IP_HDRINCL, 0). This will let the network driver encapsulate our data in an IP packet.

Main Entry Point

Now that we have all the scaffolding in place for the program, here is the main() function that ties it all together.

Note a couple of imports.
On my mac, I run this program as
sudo python

Complete Program

The complete program is checked into Github. I encourage you to check it out.


This concludes our 3 part series. I hope you liked this, and learned something about low level network programming in Python.

Monday, October 3, 2016

Implementing a Ping client in Python - Part II


In Implementing a Ping client in Python - Part I, we gave an introduction to Ping, and showed how it is implemented using the ICMP protocol. Specifically, it uses the ICMP Echo Request and Response mechanism. The client sends the server an ICMP Echo packet, and the server responds with an ICMP Response packet.

We also defined the python class structure of the ICMP packet.

In this chapter, we will implement the serialization and deserialization routines so that we can send the packet over wire, and read the response back into a python class.

First, a word about byte ordering....

Byte Ordering

Every CPU architecture has a byte ordering specification. Byte ordering, also called Endianness defines how data types that are more than 1 byte in size are ordered in memory. For eg, if you have a word with value 0xAB, how is it written in computer memory, given that memory only stores bytes? Do you store the high order byte first (i.e at the lower memory location) and then the lower byte, or the other way around? This is called Endianness.

There are two kinds of Endianness, Big Endian and Little Endian.

Big Endian

In this scheme, the higher order byte is stored at the lower memory location. So, a 16 bit value 0xAB will be stored as follows:

0 0xA

Little Endian

In Little Endian, the lower order byte is stored at the lower memory location, and then the high order byte. So, a 16 bit value 0xAB will be stored as follows:


When sending data over the network, it is always sent in Big Endian order, i.e higher byte first. So, for our program, we will need to implement routines to send word and integer on the network, and read them back as well.

Network Byte Order Conversion

We will take the icmp_pkt class defined in the Part-I of this series, and add the following methods to it.

These two functions, _write_word and parse_word are the complements of each other.

_write_word serializes a 16byte value in BigEndian order.

parse_word reads a word from a buffer.

Note the usage of python ord function. Since the input to parse_word is a string ( parameter data is a string type) the character at position i in the buffer will be the character value of the byte. However, we are interested in the raw byte value, not the mapped unicode value. So, we will need to use the ord() function to map it back to it's raw value.

There is a slight discrepancy between _write_word() and parse_word() functions. _write_word takes a list. The reason for that is that the caller is going to pass in the list. Whereas parse_word() has to deal with network data that is stored in a string buffer returned from the socket call.

Next, we will write code to serialize the python class into a network buffer, and also deserialize it.

ICMP Packet Serialization/Deserialization

The ICMP echo packet structure is very simple. It just consists of bytes and words. There is also a variable length byte array to hold user data that is sent by the client and echoed back by the server. Serializing this is very simple. Just write them in order defined in the packet.

Note the starting offset of 20 in the parse() method. This is needed because on unix, when the socket.recvfrom() returns, it will read from the start of the IP header, which will be wrong. ICMP header is encapsulated inside the IP header, and we will need to go past
the IP header (which is 20 bytes in size) to read the ICMP response.


In the current installment, we looked at byte Encodings, specifically Big Endian and Little Endian. Then we saw how to encode the ICMP packet into a byte array for sending over the wire. We also saw how to parse the byte array back into a python class instance.

In the next installment, we will start to fill in the code for the checksum and sockets.

Monday, September 26, 2016

Implementing a Ping client in Python - Part I


In this series of posts,  I will demonstrate how to implement a Ping client using Python.

Ping is one of the most useful tools that a computer programmer might use. It is used to figure out if a target machine is reachable or not.

Ping is implemented using the ICMP Protocol

ICMP Protocol

The Internet Control Message Protocol (ICMP) is defined by the Internet Engineering Task Force. The specifications of the protocol are in RFC 792

The Ping utility uses the ECHO request/response facility of the ICMP protocol. The client sends an ICMP Echo request to the server, and the server responds with an ICMP Echo response. It is a very simple request/response protocol and makes it ideal to use in order to learn low level network programming.

ICMP Packet Description

The following table describes the ICMP Echo Request/Reply Packet

Type (8 bits)Code (8 bits)Checksum (16 bits)
Identifier (16 bits)Sequence Number (16 bits)

The Type is 8 for Echo message, and 0 for reply message.

Code has to be zero.

Identifier can be used to match request/reply packets.

Sequence Number can be used to distinguish multiple request/responses from each other.

The client sends an ICMP Echo packet to the server. An echo packet has the type code 8.

The server responds with an ICMP Response packet. An ICMP Echo response packet has a type code of 0.

Python Implementation Details

I have been programming in Python for quite some time, but have never attempted low level programming at the socket level until now. Normally, in C, C# or Java, one would use byte arrays to send/receive data. However, the only byte array abstraction in Python is the String datatype ( and the StringIO buffer ). It was challenging for me to wrap my head around using a string as a byte buffer. I will mention some of the pitfalls to avoid as I go along.

Ping using C#

At my previous company, I had written a Ping utility using C#/.Net on Windows. That was described here:

Sunday, January 5, 2014

A History of Microsoft's HTTP Client Stacks

This article is a bit of history about the various Microsoft HTTP client stacks.


Originally, there was one - the WININET implementation, that was written for the Internet Explorer browser. The implementation for this was encapsulated in Wininet.dll. In it's original implementation (as shipped in IE 2.0/IE 3.0/IE 4.x), it supported the following protocols - HTTP, FTP, GOPHER. It also supported HTTP caching. For more information, see WININET Msdn Page

As client side HTTP stacks stood in those days, it was pretty good and usable. While it was originally targetted for Internet Explorer, a lot of third party applications began to be built around it. And as frequently happens, it was being put to uses for which it was not intended.

For eg, a lot of customers started using it for implementing Middle-tier applications, where the DLL was loaded in an ISAPI in the IIS process. Wininet was not really intended for such uses - it was heavily bound to a user context because of it's dependency on the user directories for storing the cached web pages. Also, it did not support delegated authentication. Also, some of the Wininet settings were only configurable through the IE interface.

Wininet still lives on - it is the HTTP stack being used by the Internet Explorer browser. In OS Versions from Windows7 onwards, it might have been replaced by WebIO.


More and more customers were using Wininet in a non user-context - for eg, in middle-tier apps, and it was not working very well. As I mentioned above, Wininet was not designed to be used in a server environment. So, a separate team was spun off to come up with a solution for that.

The result of that was WinHTTP. This was a standalone DLL, that borrowed heavily from Wininet in terms of the interfaces it offered. However, it was designed to be usable in server applications as well. It was a robust implementation, that exposed it's functionality through exported functions, as well as a COM interface that could be used in a server scenario.

However, it was a subset of the Wininet implementation. It did not support cookies, caching, and automatic credential handling. Also, Wininet had a lot of code to handle buggy HTTP servers, and proxy servers, and that code was not supported by WinHTTP.

There were two version of WinHTTP. WinHTTP 5.0 was a redistributable version, and was downloadable as a standalong component from Microsoft. WinHTTP 5.1 was made a part of the operating system, and thus was not downloadable as a separate component.


As part of Microsoft's effort to build a clean-room, managed execution environment like Java, a separate team was staffed to develop the networking library for the managed frameworks that were to ship as part of this. It was part of the project Codenamed Lightning.

The networking classes were encapsulated in the System.Net namespace. This team was staffed by ex-WININET developers, and led by Henry Sanders who is now a Vice President at Microsoft.

This library was written with a view to fix some of the problems with WININET. It was designed from the ground up to be usable in middle-tier scenarios. Also, it was designed to support asynchronous networking calls. Even the sync networking calls ended up calling the async implementations underneath, which caused the application to use up an extra Thread. This was later fixed in .Net Framework 2.0 (codename Whidbey). Also, it supported asynchronous operations on both WinNT and Win9x platforms, whereby it used Completion ports on WinNT, and Overlapped I/O on Win9x.

The class design (for the Http client stack) in the System.Net library is such that it support both beginners and advanced users seamlessly. For beginners, System.Net.WebClient offers an easy to use API to get web resources, and even to send POST requests to Web forms. For advanced and fine grained control, System.Net.HttpWebRequest class is the recommended entry point.

With a new design philosophy, the API designers decided to make usage consistent with other parts of the framework. For eg, doing asynchronous network I/O in Http was the same pattern as doing asynchronous I/O on a File handle in the System.IO.File class.

One extra thing that was available in System.Net, was a Socket implementation, similar to the facilities provided by Windows Sockets (Winsock) library on Windows. Again, keeping in mind the overall design philosophy of the .NET framework, later releases added Stream implementation ( for eg SSLStream) that could be hooked up to a Socket to provide higher level network stack functions, like encryption, authentication etc.

For debugging a Tracing/Logging functionality is provided that allows most network I/O to be logged using the .NET Runtimes logging functionality.


In Windows7, a new native HTTP stack was written, called WebIO. This is not documented as a public API, and is internal only to the browser and the Operating System.


For developers who are using .NET, the primary and only choice for a HTTP client library is System.Net namespace. However, for use outside of the .NET Runtime, the options are Wininet and WinHTTP.  All three of these stacks should support most modern applications to varying degrees.

Sunday, September 4, 2011

How to share NTLM connection across HttpWebRequest objects

When writing an application that talks to a server using NTLM authentication, HttpWebRequest does a three legged authentication handshake that looks as follows:

Client Server
GET / HTTP/1.1 HTTP/1.1 401 Access Denied
Connection: close
WWW-Authenticate: NTLM <token>
GET / HTTP/1.1
Authorization: NTLM <token1>
HTTP/1.1 401 Access Denied
Connection: close
WWW-Authenticate: NTLM NTLM <token>
GET / HTTP/1.1
Authorization: NTLM <token2>
HTTP/1.1 200 OK
Connection: close

As you can see, by default, HttpWebRequest does not share connections. The reason is that NTLM is a connection authentication protocol, and not a request authentication protocol like BASIC or DIGEST.

In other words, once an NTLM handshake succeeds, the NTLM credentials stick with the connection. So, if you have a situation (for eg, a web server talking to another webserver using NTLM auth) where there are multiple users with different credentials accessing the front-end server, there is a high risk that an authenticated connection for one user might get used by another user due to connection pooling.

In order to force HttpWebRequest to share NTLM connections, you can do the following:

Setting UnsafeAuthenticatedConnectionSharing will cause all NTLM connections to the same host to be shared.

By setting ConnectionGroupName, you can cause connections to be shared only selectively. In this case, all requests having the same ConnectionGroupName will share connections. You can use this mechanism if you are on the middle-tier scenario.


System.NET Links and How To's
Tracing with System.Net
The case of multiple NTLM challenges with IIS7

Tuesday, August 9, 2011

Textual description of firstImageUrl

Diagnosing RMI Exception: java.rmi.ConnectException: Connection refused to host:


I have a service that runs on Linux under JBOSS. This service uses JMX (RMI) to talk to a windows box running a java service. RMI is the equivalent of the windows Distributed COM that allows a client to invoke a method on a remote service.


My setup is as follows. I have a Linux machine (Client) that is running a Java service in Tomcat, hosted in JBOSS. It is trying to invoke a method on a remote machine which is running a Java program as a windows service.

Java RMI ConnectionRefused Error

Everything was working ok, until both the Linux and Windows boxes were moved to a different subnet. Then, they started failing with the following exception:

Caused by: java.rmi.ConnectException: Connection refused to host:; nested exception is: Connection refused
at sun.rmi.transport.tcp.TCPEndpoint.newSocket(
at sun.rmi.transport.tcp.TCPChannel.createConnection(
at sun.rmi.transport.tcp.TCPChannel.newConnection(
at sun.rmi.server.UnicastRef.invoke(
at Source)
at com.zillow.core.jmx.ZillowJMXConnectorFactory.connect(
at com.zillow.core.jmx.ZillowJMXConnectorFactory.connect(
at com.zillow.bcpserver.BCPServerProxy.afterPropertiesSet(
... 142 more
Caused by: Connection refused
at Method)
at sun.rmi.transport.proxy.RMIDirectSocketFactory.createSocket(
at sun.rmi.transport.proxy.RMIMasterSocketFactory.createSocket(
at sun.rmi.transport.tcp.TCPEndpoint.newSocket(
... 154 more

I searched for this exception in the search engines, and the only thing I found was people saying that if the local box (i.e linux) did not have a correct IP address for localhost, it would send the address as the "ContactMe" endpoint to the remote destination, and that would fail.

For example, the following links explained that issue:

However, in my case, that turned out not to be the problem. My Linux boxes (even after the move) had the correct hostname, and the `hostname` command was giving back the correct hostname (and not or localhost).

At this point, there was no more information available through the search engines to diagnose the problem.

So, I decided to debug it myself. I first restarted the service, and used TCPDUMP to do a network sniff on the linux box.

Here, .175 is the Linux box, and .45 is the windows box.

The following is the packet disassembly of the JRMI/ReturnData response being sent by the Windows box to the Linux box.

As you can see, the Windows server is sending back "" as the CallMe endpoint to the Linux box. The linux box tries to connect to port 1099 on and fails.

Since the machines had been moved to different networks, it is possible that the java service might have lost their IP address and network registration settings.

So, I restarted the java service on the windows box. And viola!, that fixed the problem.

Looking for tools to help you troubleshoot networking issues? My blog post Network Programmers Toolchest will come in handy.