Sunday, July 7, 2024

Textual description of firstImageUrl

Kafka Privacy Toolkit Part 1: Protect PII data using a custom single message transform.

Introduction


Kafka is a highly scalable and durable distributed streaming platform that has become the go-to choice for handling large amounts of data across a variety of industries. Its ecosystem of services is vast and powerful, enabling businesses to solve complex problems and uncover insights from their data in real-time. 

One of the key services in the Kafka ecosystem is Kafka Connect. It is a framework that enables streaming integration between Kafka and other data systems. Kafka Connect is flexible and extensible, with a plugin-based architecture that allows it to integrate seamlessly with various data systems. Users can write custom connectors to integrate Kafka with any data system that has a Java API, and Kafka Connect's RESTful API can be used to configure and manage connector instances. With Kafka Connect, businesses can leverage the power of Kafka to enable streaming integration between different data systems with ease.

Data Privacy & Compliance


Organizations process an extensive amount of data and some of that data includes sensitive information like Personally Identifiable Information (PII) such as names, addresses, social security numbers, among others. It is important to protect that data from unauthorized access and usage. 

One of the use cases where removing PII when transferring data to an external system using Kafka Connect is crucial is in the financial industry. Financial institutions are required by regulations to comply with data protection laws and maintain the confidentiality of their clients’ information. Kafka Connect enables these institutions to transfer data seamlessly between internal systems and external systems, like data warehouses or third-party vendors for analytics purposes while ensuring that sensitive information is obfuscated. This enhances data security, helps maintain compliance with regulations, and avoids the risk of data breaches.

In this blog post, we will explore how to use a custom Kafka Single Message Transformer (SMT) to obfuscate PII data from a Kafka message. This will help you maintain data privacy and security while using Kafka effectively.

Drop Messages with PII

You can use the Confluent Filter SMT to drop entire messages that match a condition. The condition can be set to match on PII data in the message.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


As you can see, the "Jon Doe" record was dropped because it had a salary field value that was greater than zero.

Remove sensitive values in PII fields.


Use the Confluent Mask Field transform to mask data in PII field. It can also replace it with a token value. When combined with Predicates, it is a powerful mechanism to mask out PII data when certain conditions are met.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


In the output, all records with a salary field were masked and replaced with a default value.

Manage JSON payloads in Kafka messages.


Sometimes, there are situations where your kafka message contains fields that are themselves JSON payloads encoded as strings. This can happen, for example, when using a DynamoDB source connector. This connector will embed the Dynamodb row into the Document field of the message.

The challenge comes when trying to mask out PII field from a JSON message embedded in a single field of a Kafka message.

For this you can use the mask-json-field transform. This transform allows you to mask out fields at any level of the JSON message. It even supports arrays.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


In the output, the salary field was replaced with Zero, and the SSN field was replaced with "xxx-xx-xxxx".

Conclusion

In this article, I showed how to use the Kafka Connect Single Message transforms to drop messages with PII field, or to mask PII fields in messages.

You can use these transforms to ensure the data flowing into or out of your kafka topics is compliant with data regulations.

Thursday, June 6, 2024

Textual description of firstImageUrl

How to add new values to an avro Enum


Introduction

Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution

In this article, we discuss how to author Avro enums so that data written with the new version of the schema can be read by consumers using the previous version of the schema. In other words, we are authoring the Avro schema so that it is Forward Compatible

Avro Enums


In Avro, you can define enums as follows:

"type": { 
	"name": "color", 
    "type": "enum", 
    "symbols": ["red", "blue", "green"] 
}


Enums are always strings in Avro.

The apache documentation describes the specification for Enums

Avro Compatibility


Now, we come to the main part of this blog post. What happens if you read data having an enum symbol that is not part of reader schema?

Lets start by defining the first version of the schema in a file called "v1.avsc". This is the reader schema.

{ 
	"name": "v1", 
	"type": "record", 
	"namespace": "com.acme", 
	"doc": "Enum test", 
	"fields": [ 
		{ 
			"name": "color", 
			"type": { 
				"name": "enum", 
				"type": "enum", 
				"symbols": [ "unknown", "red", "blue", "green" ], 
				"default": "unknown" 
			}, 
			"default": "unknown" 
		} 
	] 
}

We have the schema. How do we generate data ? For that we need avro-tools jar. Download avro-tools and store it to a local folder.

Now, you can generate random data as follows. First, write a json message corresponding to the schema.

Write the following into a file called "v1.json"
{ "color": "red" }

Now,  convert the JSON to avro using avro-tools

java -jar ~/DevTools/avro-tools-1.11.1.jar fromjson --schema-file v1.avsc
      v1.json > v1.avro


Now, we evolve the schema. Add another symbol - "yellow" to the "color" enum. Store it in a file called v2.avsc

{ 
	"name": "v1", 
	"type": "record", 
	"namespace": "com.acme", 
	"doc": "Enum test", 
	"fields": [ 
		{ 
			"name": "color", 
			"type": { 
				"name": "enum", 
				"type": "enum", 
				"symbols": [ "unknown", "red", "blue", "green", "yellow" ], 
				"default": "unknown" 
			}, 
			"default": "unknown" 
		} 
	] 
}

Create a new JSON message in file v2.json that uses the new enum value

{ "color": "yellow" }

Lets convert this to avro.


java -jar ~/DevTools/avro-tools-1.11.1.jar fromjson --schema-file v2.avsc
      v2.json > v2.avro

Now lets see what happens if you try to read the v2.avro file using v1 schema ( v1.avsc ). Remember that the v1 schema does not have the symbol "yellow" in the enum.

We will use the toJson command from avro-tools to convert the AVRO to json.


$ java -jar ~/DevTools/avro-tools-1.11.1.jar tojson --reader-schema-file v1.avsc v2.avro 
{"reason":"unknown"}

As you can see, trying to read the v2 avro message using a schema that does not have the new enum symbol causes the new enum to be converted to the default value for that enum.

Now, lets see what happens if we dont have the default value specified in the schema. Lets create a new schema with that specification, in a file called v1-nodefault.avsc

{ 
	"name": "v1", 
	"type": "record", 
	"namespace": "com.acme", 
	"doc": "Enum test", 
	"fields": [ 
		{ 
			"name": "color", 
			"type": { 
				"name": "enum", 
				"type": "enum", 
				"symbols": [ "unknown", "red", "blue", "green", "yellow" ], 
				"default": "unknown" 
			}
		} 
	] 
}


Use this schema to read v2.avro file.

java -jar ~/DevTools/avro-tools-1.11.1.jar tojson --reader-schema-file
      v1-nodefault.avsc v2.avro

This will result in the following exception printed to stdout


Exception in thread "main" org.apache.avro.AvroTypeException: No match for yellow
	at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:269)
	at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:268)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:182)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248)
	at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:98)
	at org.apache.avro.tool.Main.run(Main.java:67)
	at org.apache.avro.tool.Main.main(Main.java:56)

As you can see, removing the default value from the enum in the schema will make data written with new schema incompatible with the previous schema.

Conclusion


Use default value in Avro enums to allow schema evolution while maintaining backward compatibility with data that was written with the previous versions of the schema.

If you liked this article, you might also like the following:

Wednesday, April 17, 2024

Textual description of firstImageUrl

Troubleshooting serialization errors when using Confluent KafkaAvroSerializer

Introduction

Confluent provides on-prem and hosted services of the Apache Kafka ecosystem. As part of this they also provide a custom serializer/deserializer to enable writing and reading messages with schema from a topic.

The KafkaAvroSerializer provides an efficient mechanism for serializing messages to a kafka topic.

The wire format of the KafkaAvroSerializer is described here


Sometimes, the producer of the message is not using these libraries, but still writing messages that adhere to the confluent wire format. The reason for doing this is to make the messages compatible with Confluent so that you can use the KafkaAvroDeserializer. For eg, you might want to write those messages to S3 using the kafka-connect-s3 sink connector.

When the producer is doing custom serialization, but makes an error in encoding, it can result in serialization errors down the line. These kind of errors are difficult to debug. In this blog post, I will describe how you can root cause those errors.

Serialization Errors

Here is an example of an error we noticed in the kafka connect s3 sink connector in our production instance.


Troubleshooting


Start off by downloading the schema that the error message is referring to:


Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 15930

I download the schema for that schemaid as follows:


curl -s https://schema-registry.mydomain.local/schemas/ids/15930 | jq '.schema|fromjson' > schema.avsc

Next, you need to download the failed message from the topic. The topicname/partition/offset are given in the exception message.


  Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', 
  where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}.


If you dont see the partition/offset, you can enable your sink connector to log that info with the following configs:

errors.log.enabled: true
errors.log.include.messages: true


Now that we have the topic/partition/offset, lets download the offending message from the kafka topic

$ kafka-console-consumer \
--bootstrap-server kafka-cluster.mydomain.local:6030 \
--command-config security.properties \
--topic crm-tag-update-event-prod-topic \
--partition 0 \
--offset 52510041 \
> message_raw.bin


So we have the schema and the offending message. How do we decode them? For that we use avro-tools. You can download avro-tools from the Sonatype Index 

 For this demo, I am using 1.11.1 version. 

 But wait.. the message that we downloaded from the topic is the raw message. This message is encoded with a magic byte and schema-id in the first 5 bytes ( refer to the confluent wire format) So we first need to extract the actual message. 

This should be easy, since we need to remove the 5 byte header in the message. 

$ dd if=message_raw.bin of=message.bin bs=1 skip=5


Now that we have the actual message and the schema, we can use jsontofrag to decode the message. We expect this to fail, since the message schema-id is not matching the message schema. 

$ java -jar /tmp/avro-tools-1.11.1.jar fragtojson --schema-file schema.avsc message.bin


Root Cause 

To get to the root cause, we need to look at the producer.


One of the following is the cause of this:

1) The schema-id is correct for that topic, but the serialized message does not match.

2) The schema-id is wrong, but the serialized message is correct.


To find out which one of these it is, we need to understand the producer.

In our case, what happened was that an error in a Gitlab pipeline caused the wrong job to be deployed to our environment. That job was using the correct schema subject, but was compiled against POJOs that did not match the subject. So it serialized messages using the wrong pojo but the correct schema-id.

For #1, you need to find the actual schema that was used in serialization. Then download that schema and use it to decode the message using avro-tools.

#2 is more difficult since we dont know the correct schema-id. This will require looking at the producer code to figure out what is the correct schema subject/version or schema-id it should have used for serialization.

Conclusion

In this blog post I showed you how to :

1) Read avro messages from a kafka topic.
2) Extract the embedded message from the raw message.
3) Decode it using avro-tools.
4) Troubleshoot serialization errors.

If you liked this blog post, you might also enjoy the following:


Saturday, June 24, 2023

Textual description of firstImageUrl

mask-json-field transform - a Kafka Connect SMT to mask out PII data from JSON fields in Kafka messages.


Introduction


In today's fast-paced digital era, where vast amounts of data are being generated and processed at an unprecedented scale, the need for efficient, scalable, and real-time data processing solutions has become crucial. Traditional messaging systems and data integration tools often struggle to meet these demands, leading to delays, data loss, and inefficiencies in data processing pipelines. That's where Apache Kafka, a powerful distributed streaming platform, comes into play.

Kafka has emerged as a game-changer in the world of data streaming, enabling organizations to build robust, fault-tolerant, and highly scalable data pipelines. Developed by the Apache Software Foundation, Kafka was originally created to handle the real-time data ingestion and processing needs of LinkedIn. Since its inception, Kafka has gained immense popularity and has become the de facto standard for building event-driven architectures in a wide range of industries.

Kafka Connect plays a pivotal role in the Kafka ecosystem by providing a scalable and reliable framework for data integration. It simplifies the process of connecting external systems to Kafka, enabling seamless data ingestion and extraction. Kafka Connect acts as a bridge between Kafka topics and various data sources or sinks, allowing for the efficient transfer of data in real-time. It offers a wide range of connectors that serve as plugins to connect to different systems such as databases, file systems, message queues, and cloud services. With Kafka Connect, organizations can easily integrate their existing data infrastructure with Kafka, enabling them to build robust and flexible data pipelines. The ability to leverage pre-built connectors or develop custom ones makes Kafka Connect a valuable tool for simplifying data integration tasks and accelerating the adoption of Kafka within an organization's data ecosystem.

Kafka Connect Single Message Transforms


Single Message Transforms (SMT) is a powerful feature in Kafka Connect that allows for in-flight data transformation and enrichment within the data integration pipelines. SMT operates on individual messages as they pass through the connectors, providing the flexibility to modify, filter, or enhance the data without the need for external processing. With SMT, you can apply a variety of operations such as adding or removing fields, changing data types, filtering messages based on specific conditions, and performing calculations or aggregations. This capability enables organizations to shape the data precisely according to their requirements before it reaches the destination, ensuring data consistency, compliance, and compatibility. 

Complying with Data Regulations


Companies need to comply with regulations regarding data protection in the jurisdiction they are part of. Examples of these regulations inlcude CCPA/CPRA ( Califormia ), GLBA ( US / Financial ). Other countries have their own version of these.

Companies using Kafka ecosystem of services to transfer data need to be cognizant of these regulations and figure out where regulated data ( eg PII ) needs to be visible or not. When using kafka-connect as the integration mechanism, you can use SMTs to mask out data to comply with regulations.

In this blog post, I will introduce the Kafka Connect mask-json-field SMT.

mask-json-field Single Message Transform


There are cases where you need to mask a value in a JSON document, that is embedded in a field of a Kafka message. For eg, assume the following kafka message schema, that could be the output of a Change Data Capture (CDC) connector that is monitoring a database and writing CDC events into a kafka topic.



Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 100000, "ssn": "111-22-3333"}

In this kafka message, the Document value is a String that is a JSON payload of the Database row.

Assume we are persisting this message into a different system using a Sink connector. However, due to privacy, you dont want to persist the salary and ssn.

The JSON version of this message looks as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 100000, \"ssn\": \"111-22-3333\"}"
}


The mask-json-field SMT allows you to manipulate JSON documents embedded inside kafka messages.

Here is an example of using the transform in a FileStreamSink connector.


    
{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_ssn,mask_salary",
    "transforms.mask_ssn.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn.REPLACEMENT_FIELD_PATH": "ssn",
    "transforms.mask_ssn.REPLACEMENT_FIELD_VALUE_STRING": "xxx-xx-xxxx",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,

    "errors.tolerance": "all"

}



With this transform, the output message will be:
 
Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 0, "ssn": "xxx-xx-xxxx"}

The raw version of the output will be as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 0, \"ssn\": \"xxx-xx-xxxx\"}"
}


This tranform also works on arrays. For eg, assume you have a SSN value as an array of strings

["111", "22", "3333" ]

and you want to transform it to:

["xxx", "xx", "3333" ]

Then the connector spec will look like this:



{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_salary,mask_ssn_1,mask_ssn_2",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,
    "transforms.mask_ssn_1.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_1.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_1.REPLACEMENT_FIELD_PATH": "/ssn/0",
    "transforms.mask_ssn_1.REPLACEMENT_VALUE_STRING": "xxx",
    "transforms.mask_ssn_2.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_2.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_2.REPLACEMENT_FIELD_PATH": "/ssn/1",
    "transforms.mask_ssn_2.REPLACEMENT_VALUE_STRING": "xx",

    "errors.tolerance": "all"

}


There are many more feature that this transform supports. Look at the README.md of the source.

Conclusion


Kafka Connect and its Single Message Transforms feature provide a robust and scalable framework for data integration within the Kafka ecosystem. The mask-json-field SMT provides organizations with an extensible mechanism to mask out sensitive fields from the data, allowing compliance with various data protection regulations.


If you liked this post, here are some others you might like:

Saturday, December 4, 2021

Textual description of firstImageUrl

Avro Logical Types with Parquet, Hive and Spark

Avro Logical Types with Parquet, Hive and Spark



In this post, I am going to talk about the fidelity of types between Avro and big data technologies like Parquet, Hive and Spark.

Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.

We will also include querying that data using Hive and Spark.

Avro







Apache Avro  is a data serialization system. It defines a schema definition language, and a serialization framework to serialize data. It also supports schema evolution.

Avro schemas are defined in JSON. Avro project makes tools available to translate the schema into classes in popular programming languages.


Logical Types



Avro supports logical types. A logical type is defined as a higher level representation for a primitive type. 

For eg, a higher level type of UUID could be represented as a primitive type string.

Similarly, a higher level java.time.Instant object could be represented as a primitive type long if the timestamp is written in Epoch Milliseconds.

In Avro schema, a logical type is defined as follows:

    {
      "name": "timestamp_millis",
      "type": { 
        "type": "long", 
        "logicalType": "timestamp_millis" 
      },
      "doc": "An avro timestamp_millis logical type"
    }

The following table lists the supported logical types, and the corresponding higher level and primitive types they translate to.


Version Logical Type Java Type Schema Spec
1.9+ date java.time.LocalDate "type": { "type": "int", "logicalType": "date" }
1.9+ time-millis java.time.LocalTime "type": { "type": "int", "logicalType": "time-millis" }
1.9+ time-micros java.time.LocalTime "type": { "type": "long", "logicalType": "time-micros" }
1.9+ timestamp_millis java.time.Instant "type": { "type": "long", "logicalType": "timestamp-millis" }
1.9+ timestamp_micros java.time.Instant "type": { "type": "long", "logicalType": "timestamp-micros" }
1.10+ local_timestamp_millis java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-millis" }
1.10+ local_timestamp_micros java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-micros" }



Logical Type Example



A field named event_timestamp with logical-type timestamp_millis field is defined as follows in Avro schema:

"event_timestamp":
{
    "type": { 
              "type": "long", 
              "logicalType": "timestamp-millis" }
    }
}

When a Java POJO is generated from this schema, the `event_timestamp` field will look like this:

java.time.Instant event_timestamp;

Instead of:

long event_timestamp

The advantage with Logical Types in Avro is that you can use them to represent higher level types, while the serialization type is one of the mapped primitive type.


Now that we have an understanding of what logical types are and how they work, let's see how they work when used with other systems.

To test this, I am going to use Hive and Spark. We will use Apache Parquet as the file format. Parquet is a very common file format used in Big Data applications. It supports efficient column level query of data.

Test Bed 


In order to test this out, I created a test platform. I used Confluent's Quickstart to setup a local Kafka ecosystem consisting of a Broker and Kafka Connect. An S3 Sink connector is used to archive the messages in the topic to an S3 Path in Parquet format.

Next, I created a Hive table on top of the Parquet data. I am using Amazon Elastic MapReduce for my Hive metastore.



This is the Avro Schema that has all the possible logical types supported by Avro.



The producer app was written in Java. I generated a POJO (Plain Old Java Object) from this schema using Avro Tools, and used the generated class to send random data into the topic.

The Kafka producer in the java app was configured as follows:

"value.serializer" = io.confluent.kafka.serializers.KafkaAvroSerializer.class
"auto.register.schemas" = "false"
"value.subject.name.strategy" = io.confluent.kafka.serializers.subject.RecordNameStrategy.class
The kafka connect s3 sink was configured as follows.

"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",

The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.

The Hive Table was created in two steps.

The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe

Next,we create the final table using this temp table as a template.

The following describes the Hive HQL to create the table.
    DROP TABLE IF EXISTS tmp.logical_type_test_avro;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro
    PARTITIONED BY( `process_date` string, `process_hour` string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ avroSchemaLiteral }};
    
    DROP TABLE IF EXISTS warehouse.logical_type_test;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ tblProperties }};
    
    DROP TABLE tmp.logical_type_test_avro;
    


Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.

Parquet File Schema


Lets look at the schema of the Parquet file:

message com.ferozed.LogicalType {
  required binary id (STRING);
  optional binary environment (STRING);
  optional binary sender (STRING);
  optional int64 send_epoch_time_millis;
  required int32 logical_date (DATE);
  required int32 logical_time_millis (TIME(MILLIS,true));
  required int64 logical_time_micros;
  required int64 logical_timestamp_millis (TIMESTAMP(MILLIS,true));
  required int64 logical_timestamp_micros;
  required int64 logical_local_timestamp_millis;
  required int64 logical_local_timestamp_micros;
}

Now let us look at the actual schema of the hive table.

Name Type
id string
environment string
sender string
send_epoch_time_millis bigint
logical_date date
logical_time_millis int
logical_time_micros bigint
logical_timestamp_millis timestamp
logical_timestamp_micros bigint
logical_local_timestamp_millis bigint
logical_local_timestamp_micros bigint

The kafka producer wrote an Avro message into the topic using the provided schema.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.

Field Avro Parquet Hive
logical_date date DATE date
logical_time_millis time-millis (TIME(MILLIS,true)) int
logical_time_micros time-micros int64 bigint
logical_timestamp_millis timestamp-millis (TIMESTAMP(MILLIS,true)) timestamp
logical_timestamp_micros timestamp-micros int64 bigint
logical_local_timestamp_millis local-timestamp-millis int64 bigint
logical_local_timestamp_micros local-timestamp-micros int64 bigint

Lets see if we can query the data. I am using the Hive CLI that is part of Amazon EMR.

$ select * from warehouse.logical_type_test limit 1;

Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable

Next, Instead of querying all columns, I tried to query each column individually to see which one was having the problem. It turned out that the logical_timestamp_millis column was the one that was failing because it was typed in Hive as a timestamp.

The other columns were querying fine.

Here is a table summarizing the fidelity of Avro logical-type fields to Hive tables.


Field Avro Type Hive Type Query Result Type
logical_date date DATE PASS - LogicalType
logical_time_millis time-millis int Fail - int
logical_time_micros time-micros bigint Fail - bigint
logical_timestamp_millis timestamp-millis timestamp Fail - Mapping Error
logical_timestamp_micros timestamp-micros bigint Fail - bigint
logical_local_timestamp_millis local-timestamp-millis bigint Fail - bigint
logical_local_timestamp_micros local-timestamp-micros bigint Fail - bigint

Summary of results with Hive



From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.

The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.

The rest of the logical types did not translate at all.

Summary of results with Parquet



Only the following logical types got translated to higher level parquet types

  • logical-date
  • logical-time-millis
  • logical-timestamp-millis


  • The rest of the logical types did not translate to higher level parquet types.

    Results with Spark



    I tried to use Spark to read the S3 files.

    Spark failed with the following error:

    py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet.
        : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS);
        	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)


    Summary



    The AVRO logical types did not translate fully into Hive and Parquet.

    If you are using Kafka for micro-service communication, it should be fine to use all these logical types.

    But if the kafka data is going to be archived, be careful of these differences.

    Saturday, July 4, 2020

    Textual description of firstImageUrl

    KafkaAvroSerializer: Efficient way to serialize messages with Avro to a Kafka topic

    Introduction




    Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.

    Apache Kafka allows messages to be stored in a variety of formats in the topic:

    - Opaque bytes.
    - Avro
    - Json
    - Protocol Buffers





    In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.

    Avro Serialization


    Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.

    Avro schemas are created using JSON format.  The format of the schema document is defined in RFC 4627. An example Avro schema looks as follows:

    {
         "type": "record",
         "namespace": "com.example",
         "name": "FullName",
         "fields": [
           { "name": "first", "type": "string" },
           { "name": "last", "type": "string" }
         ]
    } 
    Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.

    Next, I download avro-tools v1.10.0 from Maven Repo

    We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.

    Generate Java POJO


    To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:

    java -jar avro-tools-1.10.0.jar compile schema test.avsc . -string

    You can see the Java pojo generated in the "com/example" subdirectory:

    $ ls com/example/

    FullName.java

    Here is the partial gist of the pojo.


    For now, just note that the Java Pojo has the schema embedded into it.

    Convert Json to Avro


    We will now write a simple json message and convert it to avro.

    Since the schema only has two fields, I created a simple message as follows:

    $ cat test.json
    { "first": "john", "last": "doe" }

    Now, lets use avro-tools to convert this to Avro format

    java -jar avro-tools-1.10.0.jar fromjson test.json --schema-file test.avsc > test.avro

    Now, lets see how the file has been generated:

    $ ls test.avro
    test.avro

    $ file test.avro
    test.avro: Apache Avro version 1
    Lets look at the file:

    $ od -c test.avro
    0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
    0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
    0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
    0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
    0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
    0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
    0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
    0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
    0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
    0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
    0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 330 315 053
    0000260    +   Z 213   ' 371  \0   S 250 225 202 277 374   \ 002 022  \b
    0000300    j   o   h   n 006   d   o   e 330 315 307   +   Z 213   ' 371
    0000320   \0   S 250 225 202 277 374   \
    0000330 

    As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.

    To complete our exercise, we can use the avro-tools to convert Avro message back to Json.

    $ java -jar avro-tools-1.10.0.jar tojson test.avro --pretty --reader-schema-file test.avsc

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

    SLF4J: Defaulting to no-operation (NOP) logger implementation

    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

    {

      "first" : "john",

      "last" : "doe"

    }

    Serializing Multiple Messages with Schema


    Let's see what happens when you try to serialize multiple messages with Avro schema.

    I created an example file with two records:

    $ cat people.json
    { "first": "john", "last": "doe" }
    { "first": "Attila", "last": "The Hun" }

    Now, I convert it to Avro

    $ java -jar avro-tools-1.10.0.jar fromjson people.json --schema-file test.avsc > people.avro

    Lets look at the output avro file:

    $ od -c people.avro
    0000000    O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
    0000020    a 222 002   {   "   t   y   p   e   "   :   "   r   e   c   o
    0000040    r   d   "   ,   "   n   a   m   e   "   :   "   F   u   l   l
    0000060    N   a   m   e   "   ,   "   n   a   m   e   s   p   a   c   e
    0000100    "   :   "   c   o   m   .   e   x   a   m   p   l   e   "   ,
    0000120    "   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e
    0000140    "   :   "   f   i   r   s   t   "   ,   "   t   y   p   e   "
    0000160    :   "   s   t   r   i   n   g   "   }   ,   {   "   n   a   m
    0000200    e   "   :   "   l   a   s   t   "   ,   "   t   y   p   e   "
    0000220    :   "   s   t   r   i   n   g   "   }   ]   } 024   a   v   r
    0000240    o   .   c   o   d   e   c  \b   n   u   l   l  \0 212 313   ]
    0000260  304   v 037 354 350 314   ~   ڪ  ** 207 365 302   1 004   $  \b
    0000300    j   o   h   n 006   d   o   e  \b   j   o   h   n 006   d   o
    0000320    e 212 313   ] 304   v 037 354 350 314   ~   ڪ  ** 207 365 061
    0000340    1
    0000341

    Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.

    Avro Serialization with Kafka


    In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....

    1. On the one hand we want the message to be self descriptive, which means we want the schema with the message.
    2. On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
    Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the Schema Registry to register the schemas. The schema registry assigns an ID to each unique Schema specification. 

    NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.

    When the message is serialized to the topic, the KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema ID from the schema registry. The schema id is obtained when you register a schema with the registry.

    The format of the message written by the serializer is as follows:

     Offset    Value Description 
     0 Magic Byte
     1-4  4 bytes of schema-id
     5- Avro encoded record 


    To see this in action, try to read a message serialized using KafkaAvroSerializer.  I am reading the message from a topic using the kafka-console-consumer

    $ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
    0000000    00  00  00  01  e0  48  65  35  39  63  62  64  36  35  2d  36
    0000020    38  64  39  2d  34  32  61  34  2d  39  66  35  35  2d  64  61
    0000040    38  37  37  63  38  64  39  33  30  34  06  61  65  31  4e  74
    0000060    65  73  74  5f  73  63  68  65  6d  61  5f  65  6e  64  70  6f
    0000100    69  6e  74  5f  72  65  63  6f  72  64  6e  61  6d  65  73  74
    0000120    72  61  74  65  67  79  2e  43  6c  69  65  6e  74  3a  20  73

    As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480

    $ printf "%d" 0x1e0

    480


    I can verify that a schema with that ID exists in my schema-registry.

    $ curl -s http://localhost:8888/schemas/ids/480 | jq '.schema|fromjson'
    {
      "type": "record",
      "name": "FullName",
      "namespace": "com.example",
      "fields": [
        {
          "name": "first",
          "type": "string"
        },
        {
          "name": "last",
          "type": "string"
        }
      ]
    }
     
    As you can see, this schema is exactly the same as the schema I created.

    Conclusion


    We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.

    If you liked this post, you might also like the following:



    Tuesday, October 4, 2016

    Implementing a Ping client in Python - Part III

    Introduction


    This is Part-3 of the N Part Series where we develop a Ping client in Python.


    Until now, we understood the structure of the ICMP Packet, and also looked at python code to serialize and deserialize the packet from the network.

    In the current installment, we will complete the program. First, let us implement the checksum.

    ICMP Header Checksum


    A checksum is a 16bit value used for error checking the header and the data. A checksum allows the receiver to make sure that the data it received did not get corrupted or altered in transit. The transmitter adds a checksum to the outgoing packet, this checksum is calculated by looking at the packet. The receiver calculates a checksum on the received packet, and compares it with the checksum in the packet. If they match, then there was no corruption on the wire.

    As per Wikipedia:

    The checksum field is the 16 bit one's complement of the one's complement sum of all 16-bit words in the header and text. If a segment contains an odd number of header and text octets to be checksummed, the last octet is padded on the right with zeros to form a 16-bit word for checksum purposes. The pad is not transmitted as part of the segment. While computing the checksum, the checksum field itself is replaced with zeros.

    Using this definition, we have the following checksum function



    Socket Introduction


    The Wikipedia article on Sockets gives some background info on this topic. For our case, since we are going to be dealing with a lower level protocol like ICMP, we will be creating a Raw socket.

    In order to create a RAW socket, you need to be a superuser. On UNIX machines, you will need to use "sudo" to run the program. On windows, you need to be a member of the Administrator group to run the program

    The python documentation for Socket is a great resource as well.

    A raw socket is created as follows:

    sock = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)
    sock.setsockopt(IPPROTO_IP, IP_HDRINCL, 0)
    sock.bind(('127.0.0.1', 0))

    Since we are only sending the ICMP packet, and not it's encapsulating IP packet, we will need to indicate to the network driver using setsockopt(IPPROTO_IP, IP_HDRINCL, 0). This will let the network driver encapsulate our data in an IP packet.

    Main Entry Point


    Now that we have all the scaffolding in place for the program, here is the main() function that ties it all together.

    Note a couple of imports.
    On my mac, I run this program as
    sudo python ping.py

    Complete Program


    The complete program is checked into Github. I encourage you to check it out.


    Conclusion

    This concludes our 3 part series. I hope you liked this, and learned something about low level network programming in Python.