Kafka is a highly scalable and durable distributed streaming platform that
has become the go-to choice for handling large amounts of data across a
variety of industries. Its ecosystem of services is vast and powerful, enabling businesses to
solve complex problems and uncover insights from their data in
real-time.
One of the key services in the Kafka ecosystem is Kafka Connect. It is a
framework that enables streaming integration between Kafka and other data
systems. Kafka Connect is flexible and extensible, with a plugin-based
architecture that allows it to integrate seamlessly with various data
systems. Users can write custom connectors to integrate Kafka with any
data system that has a Java API, and Kafka Connect's RESTful API can be
used to configure and manage connector instances. With Kafka Connect,
businesses can leverage the power of Kafka to enable streaming integration
between different data systems with ease.
Data Privacy & Compliance
Organizations process an extensive amount of data and some of that data
includes sensitive information like Personally Identifiable Information (PII)
such as names, addresses, social security numbers, among others. It is
important to protect that data from unauthorized access and usage.
One of the use cases where removing PII when transferring data to an
external system using Kafka Connect is crucial is in the financial industry.
Financial institutions are required by regulations to comply with data
protection laws and maintain the confidentiality of their clients’
information. Kafka Connect enables these institutions to transfer data
seamlessly between internal systems and external systems, like data
warehouses or third-party vendors for analytics purposes while ensuring that
sensitive information is obfuscated. This enhances data security, helps
maintain compliance with regulations, and avoids the risk of data
breaches.
In this blog post, we will explore how to use a custom Kafka Single Message
Transformer (SMT) to obfuscate PII data from a Kafka message. This will help
you maintain data privacy and security while using Kafka effectively.
Drop Messages with PII
You can use the Confluent Filter SMT to drop entire messages that match a condition. The condition can be
set to match on PII data in the message.
For eg, given the input record:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The following kafka connect SMT config will drop records that have a salary value greater than zero.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
As you can see, the "Jon Doe" record was dropped because it had a salary field
value that was greater than zero.
Remove sensitive values in PII fields.
Use the Confluent Mask Field transform to mask data in PII field. It can also replace it with a
token value. When combined with Predicates, it is a powerful mechanism to mask out PII data when certain conditions
are met.
For eg, given the input record:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The following SMT config will mask the salary field by setting it's value to Zero.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In the output, all records with a salary field were masked and replaced with a
default value.
Manage JSON payloads in Kafka messages.
Sometimes, there are situations where your kafka message contains fields that
are themselves JSON payloads encoded as strings. This can happen, for example,
when using a DynamoDB source connector. This connector will embed the Dynamodb row into the Document field of the
message.
The challenge comes when trying to mask out PII field from a JSON message
embedded in a single field of a Kafka message.
For this you can use the mask-json-field transform. This transform allows you to mask out fields at any level of
the JSON message. It even supports arrays.
For eg, given the input record:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The following SMT config will mask the ssn field in the embedded json message.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Apache Avro is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution.
In this article, we discuss how to author Avro enums so that data written with
the new version of the schema can be read by consumers using the previous
version of the schema. In other words, we are authoring the Avro schema so
that it is Forward Compatible
Now lets see what happens if you try to read the v2.avro file using v1 schema
( v1.avsc ). Remember that the v1 schema does not have the symbol "yellow" in
the enum.
We will use the toJson command from avro-tools to
convert the AVRO to json.
As you can see, trying to read the v2 avro message using a schema that does
not have the new enum symbol causes the new enum to be converted to the
default value for that enum.
Now, lets see what happens if we dont have the default value
specified in the schema. Lets create a new schema with that specification, in
a file called v1-nodefault.avsc
This will result in the following exception printed to stdout
Exception in thread "main" org.apache.avro.AvroTypeException: No match for yellow
at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:269)
at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:268)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:182)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248)
at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:98)
at org.apache.avro.tool.Main.run(Main.java:67)
at org.apache.avro.tool.Main.main(Main.java:56)
As you can see, removing the default value from the
enum in the schema will make data written with new schema
incompatible with the previous schema.
Conclusion
Use default value in Avro enums to allow schema evolution while
maintaining backward compatibility with data that was written with the
previous versions of the schema.
If you liked this article, you might also like the following:
Confluent provides on-prem and hosted services of the Apache Kafka ecosystem. As part of this they also provide a custom serializer/deserializer to enable writing and reading messages with schema from a topic.
The KafkaAvroSerializer provides an efficient mechanism for serializing messages to a kafka topic.
The wire format of the KafkaAvroSerializer is described here
Sometimes, the producer of the message is not using these libraries, but still writing messages that adhere to the confluent wire format. The reason for doing this is to make the messages compatible with Confluent so that you can use the KafkaAvroDeserializer. For eg, you might want to write those messages to S3 using the kafka-connect-s3 sink connector.
When the producer is doing custom serialization, but makes an error in encoding, it can result in serialization errors down the line. These kind of errors are difficult to debug. In this blog post, I will describe how you can root cause those errors.
Serialization Errors
Here is an example of an error we noticed in the kafka connect s3 sink connector in our production instance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}.
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic crm-tag-update-event-prod-topic to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 15930
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:425)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:204)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
... 17 more
Caused by: org.apache.avro.InvalidNumberEncodingException: Invalid long encoding
at org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:262)
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:221)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:169)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:198)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:412)
Next, you need to download the failed message from the topic. The topicname/partition/offset are given in the exception message.
Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter',
where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}.
If you dont see the partition/offset, you can enable your sink connector to log that info with the following configs:
So we have the schema and the offending message. How do we decode them?
For that we use avro-tools.
You can download avro-tools from the Sonatype Index
For this demo, I am using 1.11.1 version.
But wait.. the message that we downloaded from the topic is the raw message. This message is encoded with a magic byte and schema-id in the first 5 bytes ( refer to the confluent wire format)
So we first need to extract the actual message.
This should be easy, since we need to remove the 5 byte header in the message.
Now that we have the actual message and the schema, we can use jsontofrag to decode the message. We expect this to fail, since the message schema-id is not matching the message schema.
To get to the root cause, we need to look at the producer.
One of the following is the cause of this:
1) The schema-id is correct for that topic, but the serialized message does not match.
2) The schema-id is wrong, but the serialized message is correct.
To find out which one of these it is, we need to understand the producer.
In our case, what happened was that an error in a Gitlab pipeline caused the wrong job to be deployed to our environment. That job was using the correct schema subject, but was compiled against POJOs that did not match the subject. So it serialized messages using the wrong pojo but the correct schema-id.
For #1, you need to find the actual schema that was used in serialization. Then download that schema and use it to decode the message using avro-tools.
#2 is more difficult since we dont know the correct schema-id. This will require looking at the producer code to figure out what is the correct schema subject/version or schema-id it should have used for serialization.
Conclusion
In this blog post I showed you how to :
1) Read avro messages from a kafka topic.
2) Extract the embedded message from the raw message.
3) Decode it using avro-tools.
4) Troubleshoot serialization errors.
If you liked this blog post, you might also enjoy the following:
In today's fast-paced digital era, where vast amounts of data are being generated and processed at an unprecedented scale, the need for efficient, scalable, and real-time data processing solutions has become crucial. Traditional messaging systems and data integration tools often struggle to meet these demands, leading to delays, data loss, and inefficiencies in data processing pipelines. That's where Apache Kafka, a powerful distributed streaming platform, comes into play.
Kafka has emerged as a game-changer in the world of data streaming, enabling organizations to build robust, fault-tolerant, and highly scalable data pipelines. Developed by the Apache Software Foundation, Kafka was originally created to handle the real-time data ingestion and processing needs of LinkedIn. Since its inception, Kafka has gained immense popularity and has become the de facto standard for building event-driven architectures in a wide range of industries.
Kafka Connect plays a pivotal role in the Kafka ecosystem by providing a scalable and reliable framework for data integration. It simplifies the process of connecting external systems to Kafka, enabling seamless data ingestion and extraction. Kafka Connect acts as a bridge between Kafka topics and various data sources or sinks, allowing for the efficient transfer of data in real-time. It offers a wide range of connectors that serve as plugins to connect to different systems such as databases, file systems, message queues, and cloud services. With Kafka Connect, organizations can easily integrate their existing data infrastructure with Kafka, enabling them to build robust and flexible data pipelines. The ability to leverage pre-built connectors or develop custom ones makes Kafka Connect a valuable tool for simplifying data integration tasks and accelerating the adoption of Kafka within an organization's data ecosystem.
Kafka Connect Single Message Transforms
Single Message Transforms (SMT) is a powerful feature in Kafka Connect that allows for in-flight data transformation and enrichment within the data integration pipelines. SMT operates on individual messages as they pass through the connectors, providing the flexibility to modify, filter, or enhance the data without the need for external processing. With SMT, you can apply a variety of operations such as adding or removing fields, changing data types, filtering messages based on specific conditions, and performing calculations or aggregations. This capability enables organizations to shape the data precisely according to their requirements before it reaches the destination, ensuring data consistency, compliance, and compatibility.
Complying with Data Regulations
Companies need to comply with regulations regarding data protection in the jurisdiction they are part of. Examples of these regulations inlcude CCPA/CPRA ( Califormia ), GLBA ( US / Financial ). Other countries have their own version of these.
Companies using Kafka ecosystem of services to transfer data need to be cognizant of these regulations and figure out where regulated data ( eg PII ) needs to be visible or not. When using kafka-connect as the integration mechanism, you can use SMTs to mask out data to comply with regulations.
In this blog post, I will introduce the Kafka Connect mask-json-field SMT.
mask-json-field Single Message Transform
There are cases where you need to mask a value in a JSON document, that is embedded in a field of a Kafka message. For eg, assume the following kafka message schema, that could be the output of a Change Data Capture (CDC) connector that is monitoring a database and writing CDC events into a kafka topic.
In this kafka message, the Document value is a String that is a JSON payload of the Database row.
Assume we are persisting this message into a different system using a Sink connector. However, due to privacy, you dont want to persist the salary and ssn.
The JSON version of this message looks as follows:
There are many more feature that this transform supports. Look at the README.md of the source.
Conclusion
Kafka Connect and its Single Message Transforms feature provide a robust and scalable framework for data integration within the Kafka ecosystem. The mask-json-field SMT provides organizations with an extensible mechanism to mask out sensitive fields from the data, allowing compliance with various data protection regulations.
If you liked this post, here are some others you might like:
In this post, I am going to talk about the fidelity of types between Avro and big data technologies like Parquet, Hive and Spark.
Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.
We will also include querying that data using Hive and Spark.
Avro
Apache Avro is a data serialization system. It defines a schema definition language, and a serialization framework to serialize data. It also supports schema evolution.
Avro schemas are defined in JSON. Avro project makes tools available to translate the schema into classes in popular programming languages.
Logical Types
Avro supports logical types. A logical type is defined as a higher level representation for a primitive type.
For eg, a higher level type of UUID could be represented as a primitive type string.
Similarly, a higher level java.time.Instant object could be represented as a primitive type long if the timestamp is written in Epoch Milliseconds.
In Avro schema, a logical type is defined as follows:
{
"name": "timestamp_millis",
"type": {
"type": "long",
"logicalType": "timestamp_millis"
},
"doc": "An avro timestamp_millis logical type"
}
The following table lists the supported logical types, and the corresponding higher level and primitive types they translate to.
A field named event_timestamp with logical-type timestamp_millis field is defined as follows in Avro schema:
"event_timestamp":
{
"type": {
"type": "long",
"logicalType": "timestamp-millis" }
}
}
When a Java POJO is generated from this schema, the `event_timestamp` field will look like this:
java.time.Instant event_timestamp;
Instead of:
long event_timestamp
The advantage with Logical Types in Avro is that you can use them to represent higher level types, while the serialization type is one of the mapped primitive type.
Now that we have an understanding of what logical types are and how they work, let's see how they work when used with other systems.
To test this, I am going to use Hive and Spark. We will use Apache Parquet as the file format. Parquet is a very common file format used in Big Data applications. It supports efficient
column level query of data.
Test Bed
In order to test this out, I created a test platform. I used Confluent's Quickstart to setup a local Kafka ecosystem consisting of a Broker and Kafka Connect. An S3 Sink connector is used to archive the messages in the topic to an S3 Path in Parquet format.
Next, I created a Hive table on top of the Parquet data. I am using Amazon Elastic MapReduce for my Hive metastore.
This is the Avro Schema that has all the possible logical types supported by Avro.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The producer app was written in Java. I generated a POJO (Plain Old Java Object) from this schema using Avro Tools, and used the generated class to send random data into the topic.
The Kafka producer in the java app was configured as follows:
The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.
The Hive Table was created in two steps.
The first step was to create a temporary hive table using the provided Avro schema and
org.apache.hadoop.hive.serde2.avro.AvroSerDe
Next,we create the final table using this temp table as a template.
The following describes the Hive HQL to create the table.
DROP TABLE IF EXISTS tmp.logical_type_test_avro;
CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro
PARTITIONED BY( `process_date` string, `process_hour` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
TBLPROPERTIES {{ avroSchemaLiteral }};
DROP TABLE IF EXISTS warehouse.logical_type_test;
CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
TBLPROPERTIES {{ tblProperties }};
DROP TABLE tmp.logical_type_test_avro;
Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Now let us look at the actual schema of the hive table.
Name
Type
id
string
environment
string
sender
string
send_epoch_time_millis
bigint
logical_date
date
logical_time_millis
int
logical_time_micros
bigint
logical_timestamp_millis
timestamp
logical_timestamp_micros
bigint
logical_local_timestamp_millis
bigint
logical_local_timestamp_micros
bigint
The kafka producer wrote an Avro message into the topic using the provided schema.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.
Field
Avro
Parquet
Hive
logical_date
date
DATE
date
logical_time_millis
time-millis
(TIME(MILLIS,true))
int
logical_time_micros
time-micros
int64
bigint
logical_timestamp_millis
timestamp-millis
(TIMESTAMP(MILLIS,true))
timestamp
logical_timestamp_micros
timestamp-micros
int64
bigint
logical_local_timestamp_millis
local-timestamp-millis
int64
bigint
logical_local_timestamp_micros
local-timestamp-micros
int64
bigint
Lets see if we can query the data. I am using the Hive CLI that is part of Amazon EMR.
$ select * from warehouse.logical_type_test limit 1;
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
Next, Instead of querying all columns, I tried to query each column individually to see which one was having the problem. It turned out that the logical_timestamp_millis column was the one that was failing because it was typed in Hive as a
timestamp.
The other columns were querying fine.
Here is a table summarizing the fidelity of Avro logical-type fields to Hive tables.
Field
Avro Type
Hive Type
Query Result Type
logical_date
date
DATE
PASS - LogicalType
logical_time_millis
time-millis
int
Fail - int
logical_time_micros
time-micros
bigint
Fail - bigint
logical_timestamp_millis
timestamp-millis
timestamp
Fail - Mapping Error
logical_timestamp_micros
timestamp-micros
bigint
Fail - bigint
logical_local_timestamp_millis
local-timestamp-millis
bigint
Fail - bigint
logical_local_timestamp_micros
local-timestamp-micros
bigint
Fail - bigint
Summary of results with Hive
From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.
The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.
The rest of the logical types did not translate at all.
Summary of results with Parquet
Only the following logical types got translated to higher level parquet types
logical-date
logical-time-millis
logical-timestamp-millis
The rest of the logical types did not translate to higher level parquet types.
Results with Spark
I tried to use Spark to read the S3 files.
Spark failed with the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS);
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)
Summary
The AVRO logical types did not translate fully into Hive and Parquet.
If you are using Kafka for micro-service communication, it should be fine to use all these logical types.
But if the kafka data is going to be archived, be careful of these differences.
Apache Kafka has become the platform of choice for doing large scale messaging. It supports a variety of serialization formats for messages, and also allows integration with external systems using a wide variety of connectors.
Apache Kafka allows messages to be stored in a variety of formats in the topic:
- Opaque bytes.
- Avro
- Json
- Protocol Buffers
In this post, we take a deep dive into the way the messages are stored in a Kafka topic, when the serialization format is Avro.
Avro Serialization
Apache Avro is an Open Source project. It is used to specify a schema for a record. It provides a schema definition language ( AVSC ). Avro allows a very space efficient serialization format, where the schema id of the field is stored along with the value. It allows for schema evolution as well.
Avro schemas are created using JSON format. The format of the schema document is defined in RFC 4627. An example Avro schema looks as follows:
Let us experiment with the above schema. I stored it in a file called test.avsc on my computer.
Next, I download avro-tools v1.10.0 from Maven Repo.
We will author an example json message that adheres to the above schema. Then we will use the *avro-tools* jar we just downloaded to convert to/from avro.
Generate Java POJO
To generate a Java POJO (Plain Old Java Object) we can use the avro-tools jar as follows:
You can see the Java pojo generated in the "com/example" subdirectory:
$ ls com/example/
FullName.java
Here is the partial gist of the pojo.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ od -c test.avro 0000000 O b j 001 004 026 a v r o . s c h e m 0000020 a 222 002 { " t y p e " : " r e c o 0000040 r d " , " n a m e " : " F u l l 0000060 N a m e " , " n a m e s p a c e 0000100 " : " c o m . e x a m p l e " , 0000120 " f i e l d s " : [ { " n a m e 0000140 " : " f i r s t " , " t y p e " 0000160 : " s t r i n g " } , { " n a m 0000200 e " : " l a s t " , " t y p e " 0000220 : " s t r i n g " } ] } 024 a v r 0000240 o . c o d e c \b n u l l \0 330 315 053 0000260 + Z 213 ' 371 \0 S 250 225 202 277 374 \ 002 022 \b 0000300 j o h n 006 d o e 330 315 307 + Z 213 ' 371 0000320 \0 S 250 225 202 277 374 \ 0000330
As you can see, the binary representation has the string version of the schema specification embedded in it. You can see the schema string stored up to offset 0x22b. This can be a problem when you are writing multiple messages with the same schema into a stream - using the default Avro formatter, it will embed the schema in each message, which means that a lot of bytes on wire are now being used for transmitting the schema.
To complete our exercise, we can use the avro-tools to convert Avro message back to Json.
$ od -c people.avro 0000000 O b j 001 004 026 a v r o . s c h e m 0000020 a 222 002 { " t y p e " : " r e c o 0000040 r d " , " n a m e " : " F u l l 0000060 N a m e " , " n a m e s p a c e 0000100 " : " c o m . e x a m p l e " , 0000120 " f i e l d s " : [ { " n a m e 0000140 " : " f i r s t " , " t y p e " 0000160 : " s t r i n g " } , { " n a m 0000200 e " : " l a s t " , " t y p e " 0000220 : " s t r i n g " } ] } 024 a v r 0000240 o . c o d e c \b n u l l \0 212 313 ] 0000260 304 v 037 354 350 314 ~ ڪ ** 207 365 302 1 004 $ \b 0000300 j o h n 006 d o e \b j o h n 006 d o 0000320 e 212 313 ] 304 v 037 354 350 314 ~ ڪ ** 207 365 061 0000340 1 0000341
Compare this output to the single record output above. You will notice that the schema specification is only included once. This means that avro serialization becomes more efficient if you serialize multiple records with the same schema at once, instead of serializing a single record at a time.
Avro Serialization with Kafka
In Kafka applications, the Producers will typically write one record at a time. There are two conflicting requirements when we use Avro to serialize records to kafka....
On the one hand we want the message to be self descriptive, which means we want the schema with the message.
On the other hand, we want the number of bytes on wire per message to be the smallest possible, which means that the verbose schema should not be specified in each message (esp since it is going to be the same for each message)
Kafka solves this dilemma with a different Avro serialization method. To do this, it uses the Schema Registry to register the schemas. The schema registry assigns an ID to each unique Schema specification.
NOTE: Schema Registry integration for Kafka is not part of the Open Source Apache Kafka echosystem. It is provided by Confluent. You can try this out locally by downloading the confluent binaries, but to run it in production requires licensing.
When the message is serialized to the topic, the KafkaAvroSerializer serializes the message, but instead of including the verbose schema with the message, it includes the schema IDfrom the schema registry. The schema id is obtained when you register a schema with the registry.
The format of the message written by the serializer is as follows:
Offset
Value
Description
0
0
Magic Byte
1-4
4 bytes of schema-id
5-
Avro encoded record
To see this in action, try to read a message serialized using KafkaAvroSerializer. I am reading the message from a topic using the kafka-console-consumer
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9020 --topic test --partition 0 --offset 0 | od -t x1
As you can see, the first byte at offset 0 is the magic byte. After that we have four bytes of the Schema Id which is 0x000001e0. This translates to decimal 480
$ printf "%d" 0x1e0
480
I can verify that a schema with that ID exists in my schema-registry.
As you can see, this schema is exactly the same as the schema I created.
Conclusion
We looked at how Avro serialization works by serializing the message along with schema, making the message self describing. This lead us to see how the stock Avro serializer is not suitable for serializing messages to a stream interface ( like Amazon Kinesis, Kafka or Socket ) since the schema in each message causes wastage of space. We then looked at the KafkaAvroSerializer that ships with Confluent Kafka, and saw how it solves this problem by storing an ID that references the schema, instead of storing the entire schema in the serialized message.
If you liked this post, you might also like the following:
Until now, we understood the structure of the ICMP Packet, and also looked at python code to serialize and deserialize the packet from the network.
In the current installment, we will complete the program. First, let us implement the checksum.
ICMP Header Checksum
A checksum is a 16bit value used for error checking the header and the data. A checksum allows the receiver to make sure that the data it received did not get corrupted or altered in transit. The transmitter adds a checksum to the outgoing packet, this checksum is calculated by looking at the packet. The receiver calculates a checksum on the received packet, and compares it with the checksum in the packet. If they match, then there was no corruption on the wire.
As per Wikipedia:
The checksum field is the 16 bit one's complement of the one's complement sum of all 16-bit words in the header and text. If a segment contains an odd number of header and text octets to be checksummed, the last octet is padded on the right with zeros to form a 16-bit word for checksum purposes. The pad is not transmitted as part of the segment. While computing the checksum, the checksum field itself is replaced with zeros.
Using this definition, we have the following checksum function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The Wikipedia article on Sockets gives some background info on this topic. For our case, since we are going to be dealing with a lower level protocol like ICMP, we will be creating a Raw socket.
In order to create a RAW socket, you need to be a superuser. On UNIX machines, you will need to use "sudo" to run the program. On windows, you need to be a member of the Administrator group to run the program
The python documentation for Socket is a great resource as well.
A raw socket is created as follows:
sock = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)
sock.setsockopt(IPPROTO_IP, IP_HDRINCL, 0)
sock.bind(('127.0.0.1', 0))
Since we are only sending the ICMP packet, and not it's encapsulating IP packet, we will need to indicate to the network driver using setsockopt(IPPROTO_IP, IP_HDRINCL, 0). This will let the network driver encapsulate our data in an IP packet.
Main Entry Point
Now that we have all the scaffolding in place for the program, here is the main() function that ties it all together.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters