Saturday, June 24, 2023

Textual description of firstImageUrl

mask-json-field transform - a Kafka Connect SMT to mask out PII field from JSON fields in Kafka messages.


Introduction


In today's fast-paced digital era, where vast amounts of data are being generated and processed at an unprecedented scale, the need for efficient, scalable, and real-time data processing solutions has become crucial. Traditional messaging systems and data integration tools often struggle to meet these demands, leading to delays, data loss, and inefficiencies in data processing pipelines. That's where Apache Kafka, a powerful distributed streaming platform, comes into play.

Kafka has emerged as a game-changer in the world of data streaming, enabling organizations to build robust, fault-tolerant, and highly scalable data pipelines. Developed by the Apache Software Foundation, Kafka was originally created to handle the real-time data ingestion and processing needs of LinkedIn. Since its inception, Kafka has gained immense popularity and has become the de facto standard for building event-driven architectures in a wide range of industries.

Kafka Connect plays a pivotal role in the Kafka ecosystem by providing a scalable and reliable framework for data integration. It simplifies the process of connecting external systems to Kafka, enabling seamless data ingestion and extraction. Kafka Connect acts as a bridge between Kafka topics and various data sources or sinks, allowing for the efficient transfer of data in real-time. It offers a wide range of connectors that serve as plugins to connect to different systems such as databases, file systems, message queues, and cloud services. With Kafka Connect, organizations can easily integrate their existing data infrastructure with Kafka, enabling them to build robust and flexible data pipelines. The ability to leverage pre-built connectors or develop custom ones makes Kafka Connect a valuable tool for simplifying data integration tasks and accelerating the adoption of Kafka within an organization's data ecosystem.

Kafka Connect Single Message Transforms


Single Message Transforms (SMT) is a powerful feature in Kafka Connect that allows for in-flight data transformation and enrichment within the data integration pipelines. SMT operates on individual messages as they pass through the connectors, providing the flexibility to modify, filter, or enhance the data without the need for external processing. With SMT, you can apply a variety of operations such as adding or removing fields, changing data types, filtering messages based on specific conditions, and performing calculations or aggregations. This capability enables organizations to shape the data precisely according to their requirements before it reaches the destination, ensuring data consistency, compliance, and compatibility. 

Complying with Data Regulations


Companies need to comply with regulations regarding data protection in the jurisdiction they are part of. Examples of these regulations inlcude CCPA/CPRA ( Califormia ), GLBA ( US / Financial ). Other countries have their own version of these.

Companies using Kafka ecosystem of services to transfer data need to be cognizant of these regulations and figure out where regulated data ( eg PII ) needs to be visible or not. When using kafka-connect as the integration mechanism, you can use SMTs to mask out data to comply with regulations.

In this blog post, I will introduce the Kafka Connect mask-json-field SMT.

mask-json-field Single Message Transform


There are cases where you need to mask a value in a JSON document, that is embedded in a field of a Kafka message. For eg, assume the following kafka message schema, that could be the output of a Change Data Capture (CDC) connector that is monitoring a database and writing CDC events into a kafka topic.



Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 100000, "ssn": "111-22-3333"}

In this kafka message, the Document value is a String that is a JSON payload of the Database row.

Assume we are persisting this message into a different system using a Sink connector. However, due to privacy, you dont want to persist the salary and ssn.

The JSON version of this message looks as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 100000, \"ssn\": \"111-22-3333\"}"
}


The mask-json-field SMT allows you to manipulate JSON documents embedded inside kafka messages.

Here is an example of using the transform in a FileStreamSink connector.


    
{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_ssn,mask_salary",
    "transforms.mask_ssn.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn.REPLACEMENT_FIELD_PATH": "ssn",
    "transforms.mask_ssn.REPLACEMENT_FIELD_VALUE_STRING": "xxx-xx-xxxx",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,

    "errors.tolerance": "all"

}



With this transform, the output message will be:
 
Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 0, "ssn": "xxx-xx-xxxx"}

The raw version of the output will be as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 0, \"ssn\": \"xxx-xx-xxxx\"}"
}


This tranform also works on arrays. For eg, assume you have a SSN value as an array of strings

["111", "22", "3333" ]

and you want to transform it to:

["xxx", "xx", "3333" ]

Then the connector spec will look like this:



{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_salary,mask_ssn_1,mask_ssn_2",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,
    "transforms.mask_ssn_1.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_1.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_1.REPLACEMENT_FIELD_PATH": "/ssn/0",
    "transforms.mask_ssn_1.REPLACEMENT_VALUE_STRING": "xxx",
    "transforms.mask_ssn_2.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_2.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_2.REPLACEMENT_FIELD_PATH": "/ssn/1",
    "transforms.mask_ssn_2.REPLACEMENT_VALUE_STRING": "xx",

    "errors.tolerance": "all"

}


There are many more feature that this transform supports. Look at the README.md of the source.

Conclusion


Kafka Connect and its Single Message Transforms feature provide a robust and scalable framework for data integration within the Kafka ecosystem. The mask-json-field SMT provides organizations with an extensible mechanism to mask out sensitive fields from the data, allowing compliance with various data protection regulations.


If you liked this post, here are some others you might like:

Saturday, December 4, 2021

Textual description of firstImageUrl

Avro Logical Types with Parquet, Hive and Spark

Avro Logical Types with Parquet, Hive and Spark



In this post, I am going to talk about the fidelity of types between Avro and big data technologies like Parquet, Hive and Spark.

Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.

We will also include querying that data using Hive and Spark.

Avro







Apache Avro  is a data serialization system. It defines a schema definition language, and a serialization framework to serialize data. It also supports schema evolution.

Avro schemas are defined in JSON. Avro project makes tools available to translate the schema into classes in popular programming languages.


Logical Types



Avro supports logical types. A logical type is defined as a higher level representation for a primitive type. 

For eg, a higher level type of UUID could be represented as a primitive type string.

Similarly, a higher level java.time.Instant object could be represented as a primitive type long if the timestamp is written in Epoch Milliseconds.

In Avro schema, a logical type is defined as follows:

    {
      "name": "timestamp_millis",
      "type": { 
        "type": "long", 
        "logicalType": "timestamp_millis" 
      },
      "doc": "An avro timestamp_millis logical type"
    }

The following table lists the supported logical types, and the corresponding higher level and primitive types they translate to.


Version Logical Type Java Type Schema Spec
1.9+ date java.time.LocalDate "type": { "type": "int", "logicalType": "date" }
1.9+ time-millis java.time.LocalTime "type": { "type": "int", "logicalType": "time-millis" }
1.9+ time-micros java.time.LocalTime "type": { "type": "long", "logicalType": "time-micros" }
1.9+ timestamp_millis java.time.Instant "type": { "type": "long", "logicalType": "timestamp-millis" }
1.9+ timestamp_micros java.time.Instant "type": { "type": "long", "logicalType": "timestamp-micros" }
1.10+ local_timestamp_millis java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-millis" }
1.10+ local_timestamp_micros java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-micros" }



Logical Type Example



A field named event_timestamp with logical-type timestamp_millis field is defined as follows in Avro schema:

"event_timestamp":
{
    "type": { 
              "type": "long", 
              "logicalType": "timestamp-millis" }
    }
}

When a Java POJO is generated from this schema, the `event_timestamp` field will look like this:

java.time.Instant event_timestamp;

Instead of:

long event_timestamp

The advantage with Logical Types in Avro is that you can use them to represent higher level types, while the serialization type is one of the mapped primitive type.


Now that we have an understanding of what logical types are and how they work, let's see how they work when used with other systems.

To test this, I am going to use Hive and Spark. We will use Apache Parquet as the file format. Parquet is a very common file format used in Big Data applications. It supports efficient column level query of data.

Test Bed 


In order to test this out, I created a test platform. I used Confluent's Quickstart to setup a local Kafka ecosystem consisting of a Broker and Kafka Connect. An S3 Sink connector is used to archive the messages in the topic to an S3 Path in Parquet format.

Next, I created a Hive table on top of the Parquet data. I am using Amazon Elastic MapReduce for my Hive metastore.



This is the Avro Schema that has all the possible logical types supported by Avro.



The producer app was written in Java. I generated a POJO (Plain Old Java Object) from this schema using Avro Tools, and used the generated class to send random data into the topic.

The Kafka producer in the java app was configured as follows:

"value.serializer" = io.confluent.kafka.serializers.KafkaAvroSerializer.class
"auto.register.schemas" = "false"
"value.subject.name.strategy" = io.confluent.kafka.serializers.subject.RecordNameStrategy.class
The kafka connect s3 sink was configured as follows.

"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",

The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.

The Hive Table was created in two steps.

The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe

Next,we create the final table using this temp table as a template.

The following describes the Hive HQL to create the table.
    DROP TABLE IF EXISTS tmp.logical_type_test_avro;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro
    PARTITIONED BY( `process_date` string, `process_hour` string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ avroSchemaLiteral }};
    
    DROP TABLE IF EXISTS warehouse.logical_type_test;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ tblProperties }};
    
    DROP TABLE tmp.logical_type_test_avro;
    


Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.

Parquet File Schema


Lets look at the schema of the Parquet file:

message com.ferozed.LogicalType {
  required binary id (STRING);
  optional binary environment (STRING);
  optional binary sender (STRING);
  optional int64 send_epoch_time_millis;
  required int32 logical_date (DATE);
  required int32 logical_time_millis (TIME(MILLIS,true));
  required int64 logical_time_micros;
  required int64 logical_timestamp_millis (TIMESTAMP(MILLIS,true));
  required int64 logical_timestamp_micros;
  required int64 logical_local_timestamp_millis;
  required int64 logical_local_timestamp_micros;
}

Now let us look at the actual schema of the hive table.

Name Type
id string
environment string
sender string
send_epoch_time_millis bigint
logical_date date
logical_time_millis int
logical_time_micros bigint
logical_timestamp_millis timestamp
logical_timestamp_micros bigint
logical_local_timestamp_millis bigint
logical_local_timestamp_micros bigint

The kafka producer wrote an Avro message into the topic using the provided schema.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.

Field Avro Parquet Hive
logical_date date DATE date
logical_time_millis time-millis (TIME(MILLIS,true)) int
logical_time_micros time-micros int64 bigint
logical_timestamp_millis timestamp-millis (TIMESTAMP(MILLIS,true)) timestamp
logical_timestamp_micros timestamp-micros int64 bigint
logical_local_timestamp_millis local-timestamp-millis int64 bigint
logical_local_timestamp_micros local-timestamp-micros int64 bigint

Lets see if we can query the data. I am using the Hive CLI that is part of Amazon EMR.

$ select * from warehouse.logical_type_test limit 1;

Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable

Next, Instead of querying all columns, I tried to query each column individually to see which one was having the problem. It turned out that the logical_timestamp_millis column was the one that was failing because it was typed in Hive as a timestamp.

The other columns were querying fine.

Here is a table summarizing the fidelity of Avro logical-type fields to Hive tables.


Field Avro Type Hive Type Query Result Type
logical_date date DATE PASS - LogicalType
logical_time_millis time-millis int Fail - int
logical_time_micros time-micros bigint Fail - bigint
logical_timestamp_millis timestamp-millis timestamp Fail - Mapping Error
logical_timestamp_micros timestamp-micros bigint Fail - bigint
logical_local_timestamp_millis local-timestamp-millis bigint Fail - bigint
logical_local_timestamp_micros local-timestamp-micros bigint Fail - bigint

Summary of results with Hive



From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.

The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.

The rest of the logical types did not translate at all.

Summary of results with Parquet



Only the following logical types got translated to higher level parquet types

  • logical-date
  • logical-time-millis
  • logical-timestamp-millis


  • The rest of the logical types did not translate to higher level parquet types.

    Results with Spark



    I tried to use Spark to read the S3 files.

    Spark failed with the following error:

    py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet.
        : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS);
        	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)


    Summary



    The AVRO logical types did not translate fully into Hive and Parquet.

    If you are using Kafka for micro-service communication, it should be fine to use all these logical types.

    But if the kafka data is going to be archived, be careful of these differences.

    Saturday, July 4, 2020

    Textual description of firstImageUrl

    KafkaAvroSerializer: Efficient way to serialize messages with Avro to a Kafka topic

    Introduction




    Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.

    Apache Kafka allows messages to be stored in a variety of formats in the topic:

    - Opaque bytes.
    - Avro
    - Json
    - Protocol Buffers





    In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.

    Avro Serialization


    Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.

    Avro schemas are created using JSON format.  The format of the schema document is defined in RFC 4627. An example Avro schema looks as follows:

    {
         "type": "record",
         "namespace": "com.example",
         "name": "FullName",
         "fields": [
           { "name": "first", "type": "string" },
           { "name": "last", "type": "string" }
         ]
    } 
    Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.

    Next, I download avro-tools v1.10.0 from Maven Repo

    We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.

    Generate Java POJO


    To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:

    java -jar avro-tools-1.10.0.jar compile schema test.avsc . -string

    You can see the Java pojo generated in the "com/example" subdirectory:

    $ ls com/example/

    FullName.java

    Here is the partial gist of the pojo.


    For now, just note that the Java Pojo has the schema embedded into it.

    Convert Json to Avro


    We will now write a simple json message and convert it to avro.

    Since the schema only has two fields, I created a simple message as follows:

    $ cat test.json
    { "first": "john", "last": "doe" }

    Now, lets use avro-tools to convert this to Avro format

    java -jar avro-tools-1.10.0.jar fromjson test.json --schema-file test.avsc > test.avro

    Now, lets see how the file has been generated:

    $ ls test.avro
    test.avro

    $ file test.avro
    test.avro: Apache Avro version 1
    Lets look at the file:

    $ od -c test.avro
    0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
    0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
    0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
    0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
    0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
    0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
    0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
    0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
    0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
    0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
    0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 330 315 053
    0000260    +   Z 213   ' 371  \0   S 250 225 202 277 374   \ 002 022  \b
    0000300    j   o   h   n 006   d   o   e 330 315 307   +   Z 213   ' 371
    0000320   \0   S 250 225 202 277 374   \
    0000330 

    As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.

    To complete our exercise, we can use the avro-tools to convert Avro message back to Json.

    $ java -jar avro-tools-1.10.0.jar tojson test.avro --pretty --reader-schema-file test.avsc

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

    SLF4J: Defaulting to no-operation (NOP) logger implementation

    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

    {

      "first" : "john",

      "last" : "doe"

    }

    Serializing Multiple Messages with Schema


    Let's see what happens when you try to serialize multiple messages with Avro schema.

    I created an example file with two records:

    $ cat people.json
    { "first": "john", "last": "doe" }
    { "first": "Attila", "last": "The Hun" }

    Now, I convert it to Avro

    $ java -jar avro-tools-1.10.0.jar fromjson people.json --schema-file test.avsc > people.avro

    Lets look at the output avro file:

    $ od -c people.avro
    0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
    0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
    0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
    0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
    0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
    0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
    0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
    0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
    0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
    0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
    0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 212 313   ]
    0000260  304   v 037 354 350 314   ~   Úª  ** 207 365 302   1 004   $  \b
    0000300    j   o   h   n 006   d   o   e  \b   j   o   h   n 006   d   o
    0000320    e 212 313   ] 304   v 037 354 350 314   ~   Úª  ** 207 365 061
    0000340    1
    0000341

    Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.

    Avro Serialization with Kafka


    In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....

    1. On the one hand we want the message to be self descriptive, which means we want the schema with the message.
    2. On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
    Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the Schema Registry to register the schemas. The schema registry assigns an ID to each unique Schema specification. 

    NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.

    When the message is serialized to the topic, the KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema ID from the schema registry. The schema id is obtained when you register a schema with the registry.

    The format of the message written by the serializer is as follows:

     Offset    Value Description 
     0 Magic Byte
     1-4  4 bytes of schema-id
     5- Avro encoded record 


    To see this in action, try to read a message serialized using KafkaAvroSerializer.  I am reading the message from a topic using the kafka-console-consumer

    $ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
    0000000    00  00  00  01  e0  48  65  35  39  63  62  64  36  35  2d  36
    0000020    38  64  39  2d  34  32  61  34  2d  39  66  35  35  2d  64  61
    0000040    38  37  37  63  38  64  39  33  30  34  06  61  65  31  4e  74
    0000060    65  73  74  5f  73  63  68  65  6d  61  5f  65  6e  64  70  6f
    0000100    69  6e  74  5f  72  65  63  6f  72  64  6e  61  6d  65  73  74
    0000120    72  61  74  65  67  79  2e  43  6c  69  65  6e  74  3a  20  73

    As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480

    $ printf "%d" 0x1e0

    480


    I can verify that a schema with that ID exists in my schema-registry.

    $ curl -s http://localhost:8888/schemas/ids/480 | jq '.schema|fromjson'
    {
      "type": "record",
      "name": "FullName",
      "namespace": "com.example",
      "fields": [
        {
          "name": "first",
          "type": "string"
        },
        {
          "name": "last",
          "type": "string"
        }
      ]
    }
     
    As you can see, this schema is exactly the same as the schema I created.

    Conclusion


    We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.

    Tuesday, October 4, 2016

    Implementing a Ping client in Python - Part III

    Introduction


    This is Part-3 of the N Part Series where we develop a Ping client in Python.


    Until now, we understood the structure of the ICMP Packet, and also looked at python code to serialize and deserialize the packet from the network.

    In the current installment, we will complete the program. First, let us implement the checksum.

    ICMP Header Checksum


    A checksum is a 16bit value used for error checking the header and the data. A checksum allows the receiver to make sure that the data it received did not get corrupted or altered in transit. The transmitter adds a checksum to the outgoing packet, this checksum is calculated by looking at the packet. The receiver calculates a checksum on the received packet, and compares it with the checksum in the packet. If they match, then there was no corruption on the wire.

    As per Wikipedia:

    The checksum field is the 16 bit one's complement of the one's complement sum of all 16-bit words in the header and text. If a segment contains an odd number of header and text octets to be checksummed, the last octet is padded on the right with zeros to form a 16-bit word for checksum purposes. The pad is not transmitted as part of the segment. While computing the checksum, the checksum field itself is replaced with zeros.

    Using this definition, we have the following checksum function



    Socket Introduction


    The Wikipedia article on Sockets gives some background info on this topic. For our case, since we are going to be dealing with a lower level protocol like ICMP, we will be creating a Raw socket.

    In order to create a RAW socket, you need to be a superuser. On UNIX machines, you will need to use "sudo" to run the program. On windows, you need to be a member of the Administrator group to run the program

    The python documentation for Socket is a great resource as well.

    A raw socket is created as follows:

    sock = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)
    sock.setsockopt(IPPROTO_IP, IP_HDRINCL, 0)
    sock.bind(('127.0.0.1', 0))

    Since we are only sending the ICMP packet, and not it's encapsulating IP packet, we will need to indicate to the network driver using setsockopt(IPPROTO_IP, IP_HDRINCL, 0). This will let the network driver encapsulate our data in an IP packet.

    Main Entry Point


    Now that we have all the scaffolding in place for the program, here is the main() function that ties it all together.

    Note a couple of imports.
    On my mac, I run this program as
    sudo python ping.py

    Complete Program


    The complete program is checked into Github. I encourage you to check it out.


    Conclusion

    This concludes our 3 part series. I hope you liked this, and learned something about low level network programming in Python.

    Monday, October 3, 2016

    Implementing a Ping client in Python - Part II

    Introduction

    In Implementing a Ping client in Python - Part I, we gave an introduction to Ping, and showed how it is implemented using the ICMP protocol. Specifically, it uses the ICMP Echo Request and Response mechanism. The client sends the server an ICMP Echo packet, and the server responds with an ICMP Response packet.

    We also defined the python class structure of the ICMP packet.

    In this chapter, we will implement the serialization and deserialization routines so that we can send the packet over wire, and read the response back into a python class.

    First, a word about byte ordering....

    Byte Ordering


    Every CPU architecture has a byte ordering specification. Byte ordering, also called Endianness defines how data types that are more than 1 byte in size are ordered in memory. For eg, if you have a word with value 0xAB, how is it written in computer memory, given that memory only stores bytes? Do you store the high order byte first (i.e at the lower memory location) and then the lower byte, or the other way around? This is called Endianness.

    There are two kinds of Endianness, Big Endian and Little Endian.

    Big Endian

    In this scheme, the higher order byte is stored at the lower memory location. So, a 16 bit value 0xAB will be stored as follows:

    OffsetValue
    0 0xA
    10xB

    Little Endian


    In Little Endian, the lower order byte is stored at the lower memory location, and then the high order byte. So, a 16 bit value 0xAB will be stored as follows:

    OffsetValue
    00xB
    10xA

    When sending data over the network, it is always sent in Big Endian order, i.e higher byte first. So, for our program, we will need to implement routines to send word and integer on the network, and read them back as well.

    Network Byte Order Conversion

    We will take the icmp_pkt class defined in the Part-I of this series, and add the following methods to it.


    These two functions, _write_word and parse_word are the complements of each other.

    _write_word serializes a 16byte value in BigEndian order.

    parse_word reads a word from a buffer.

    Note the usage of python ord function. Since the input to parse_word is a string ( parameter data is a string type) the character at position i in the buffer will be the character value of the byte. However, we are interested in the raw byte value, not the mapped unicode value. So, we will need to use the ord() function to map it back to it's raw value.

    There is a slight discrepancy between _write_word() and parse_word() functions. _write_word takes a list. The reason for that is that the caller is going to pass in the list. Whereas parse_word() has to deal with network data that is stored in a string buffer returned from the socket call.

    Next, we will write code to serialize the python class into a network buffer, and also deserialize it.

    ICMP Packet Serialization/Deserialization


    The ICMP echo packet structure is very simple. It just consists of bytes and words. There is also a variable length byte array to hold user data that is sent by the client and echoed back by the server. Serializing this is very simple. Just write them in order defined in the packet.

    Note the starting offset of 20 in the parse() method. This is needed because on unix, when the socket.recvfrom() returns, it will read from the start of the IP header, which will be wrong. ICMP header is encapsulated inside the IP header, and we will need to go past
    the IP header (which is 20 bytes in size) to read the ICMP response.


    Conclusion


    In the current installment, we looked at byte Encodings, specifically Big Endian and Little Endian. Then we saw how to encode the ICMP packet into a byte array for sending over the wire. We also saw how to parse the byte array back into a python class instance.

    In the next installment, we will start to fill in the code for the checksum and sockets.

    Monday, September 26, 2016

    Implementing a Ping client in Python - Part I

    Introduction

    In this series of posts,  I will demonstrate how to implement a Ping client using Python.

    Ping is one of the most useful tools that a computer programmer might use. It is used to figure out if a target machine is reachable or not.

    Ping is implemented using the ICMP Protocol

    ICMP Protocol

    The Internet Control Message Protocol (ICMP) is defined by the Internet Engineering Task Force. The specifications of the protocol are in RFC 792

    The Ping utility uses the ECHO request/response facility of the ICMP protocol. The client sends an ICMP Echo request to the server, and the server responds with an ICMP Echo response. It is a very simple request/response protocol and makes it ideal to use in order to learn low level network programming.

    ICMP Packet Description

    The following table describes the ICMP Echo Request/Reply Packet


    Type (8 bits)Code (8 bits)Checksum (16 bits)
    Identifier (16 bits)Sequence Number (16 bits)
    Data...

    The Type is 8 for Echo message, and 0 for reply message.

    Code has to be zero.

    Identifier can be used to match request/reply packets.

    Sequence Number can be used to distinguish multiple request/responses from each other.

    The client sends an ICMP Echo packet to the server. An echo packet has the type code 8.

    The server responds with an ICMP Response packet. An ICMP Echo response packet has a type code of 0.

    Python Implementation Details

    I have been programming in Python for quite some time, but have never attempted low level programming at the socket level until now. Normally, in C, C# or Java, one would use byte arrays to send/receive data. However, the only byte array abstraction in Python is the String datatype ( and the StringIO buffer ). It was challenging for me to wrap my head around using a string as a byte buffer. I will mention some of the pitfalls to avoid as I go along.

    Ping using C#

    At my previous company, I had written a Ping utility using C#/.Net on Windows. That was described here:



    Sunday, January 5, 2014

    A History of Microsoft's HTTP Client Stacks

    This article is a bit of history about the various Microsoft HTTP client stacks.

    WININET


    Originally, there was one - the WININET implementation, that was written for the Internet Explorer browser. The implementation for this was encapsulated in Wininet.dll. In it's original implementation (as shipped in IE 2.0/IE 3.0/IE 4.x), it supported the following protocols - HTTP, FTP, GOPHER. It also supported HTTP caching. For more information, see WININET Msdn Page


    As client side HTTP stacks stood in those days, it was pretty good and usable. While it was originally targetted for Internet Explorer, a lot of third party applications began to be built around it. And as frequently happens, it was being put to uses for which it was not intended.


    For eg, a lot of customers started using it for implementing Middle-tier applications, where the DLL was loaded in an ISAPI in the IIS process. Wininet was not really intended for such uses - it was heavily bound to a user context because of it's dependency on the user directories for storing the cached web pages. Also, it did not support delegated authentication. Also, some of the Wininet settings were only configurable through the IE interface.


    Wininet still lives on - it is the HTTP stack being used by the Internet Explorer browser. In OS Versions from Windows7 onwards, it might have been replaced by WebIO.


    WinHTTP


    More and more customers were using Wininet in a non user-context - for eg, in middle-tier apps, and it was not working very well. As I mentioned above, Wininet was not designed to be used in a server environment. So, a separate team was spun off to come up with a solution for that.


    The result of that was WinHTTP. This was a standalone DLL, that borrowed heavily from Wininet in terms of the interfaces it offered. However, it was designed to be usable in server applications as well. It was a robust implementation, that exposed it's functionality through exported functions, as well as a COM interface that could be used in a server scenario.


    However, it was a subset of the Wininet implementation. It did not support cookies, caching, and automatic credential handling. Also, Wininet had a lot of code to handle buggy HTTP servers, and proxy servers, and that code was not supported by WinHTTP.


    There were two version of WinHTTP. WinHTTP 5.0 was a redistributable version, and was downloadable as a standalong component from Microsoft. WinHTTP 5.1 was made a part of the operating system, and thus was not downloadable as a separate component.

    System.Net


    As part of Microsoft's effort to build a clean-room, managed execution environment like Java, a separate team was staffed to develop the networking library for the managed frameworks that were to ship as part of this. It was part of the project Codenamed Lightning.

    The networking classes were encapsulated in the System.Net namespace. This team was staffed by ex-WININET developers, and led by Henry Sanders who is now a Vice President at Microsoft.


    This library was written with a view to fix some of the problems with WININET. It was designed from the ground up to be usable in middle-tier scenarios. Also, it was designed to support asynchronous networking calls. Even the sync networking calls ended up calling the async implementations underneath, which caused the application to use up an extra Thread. This was later fixed in .Net Framework 2.0 (codename Whidbey). Also, it supported asynchronous operations on both WinNT and Win9x platforms, whereby it used Completion ports on WinNT, and Overlapped I/O on Win9x.

    The class design (for the Http client stack) in the System.Net library is such that it support both beginners and advanced users seamlessly. For beginners, System.Net.WebClient offers an easy to use API to get web resources, and even to send POST requests to Web forms. For advanced and fine grained control, System.Net.HttpWebRequest class is the recommended entry point.

    With a new design philosophy, the API designers decided to make usage consistent with other parts of the framework. For eg, doing asynchronous network I/O in Http was the same pattern as doing asynchronous I/O on a File handle in the System.IO.File class.

    One extra thing that was available in System.Net, was a Socket implementation, similar to the facilities provided by Windows Sockets (Winsock) library on Windows. Again, keeping in mind the overall design philosophy of the .NET framework, later releases added Stream implementation ( for eg SSLStream) that could be hooked up to a Socket to provide higher level network stack functions, like encryption, authentication etc.

    For debugging a Tracing/Logging functionality is provided that allows most network I/O to be logged using the .NET Runtimes logging functionality.

    WebIO


    In Windows7, a new native HTTP stack was written, called WebIO. This is not documented as a public API, and is internal only to the browser and the Operating System.

    Conclusion


    For developers who are using .NET, the primary and only choice for a HTTP client library is System.Net namespace. However, for use outside of the .NET Runtime, the options are Wininet and WinHTTP.  All three of these stacks should support most modern applications to varying degrees.