Sunday, July 7, 2024

Textual description of firstImageUrl

Kafka Privacy Toolkit Part 1: Protect PII data using a custom single message transform.

Introduction


Kafka is a highly scalable and durable distributed streaming platform that has become the go-to choice for handling large amounts of data across a variety of industries. Its ecosystem of services is vast and powerful, enabling businesses to solve complex problems and uncover insights from their data in real-time. 

One of the key services in the Kafka ecosystem is Kafka Connect. It is a framework that enables streaming integration between Kafka and other data systems. Kafka Connect is flexible and extensible, with a plugin-based architecture that allows it to integrate seamlessly with various data systems. Users can write custom connectors to integrate Kafka with any data system that has a Java API, and Kafka Connect's RESTful API can be used to configure and manage connector instances. With Kafka Connect, businesses can leverage the power of Kafka to enable streaming integration between different data systems with ease.

Data Privacy & Compliance


Organizations process an extensive amount of data and some of that data includes sensitive information like Personally Identifiable Information (PII) such as names, addresses, social security numbers, among others. It is important to protect that data from unauthorized access and usage. 

One of the use cases where removing PII when transferring data to an external system using Kafka Connect is crucial is in the financial industry. Financial institutions are required by regulations to comply with data protection laws and maintain the confidentiality of their clients’ information. Kafka Connect enables these institutions to transfer data seamlessly between internal systems and external systems, like data warehouses or third-party vendors for analytics purposes while ensuring that sensitive information is obfuscated. This enhances data security, helps maintain compliance with regulations, and avoids the risk of data breaches.

In this blog post, we will explore how to use a custom Kafka Single Message Transformer (SMT) to obfuscate PII data from a Kafka message. This will help you maintain data privacy and security while using Kafka effectively.

Drop Messages with PII

You can use the Confluent Filter SMT to drop entire messages that match a condition. The condition can be set to match on PII data in the message.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


As you can see, the "Jon Doe" record was dropped because it had a salary field value that was greater than zero.

Remove sensitive values in PII fields.


Use the Confluent Mask Field transform to mask data in PII field. It can also replace it with a token value. When combined with Predicates, it is a powerful mechanism to mask out PII data when certain conditions are met.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


In the output, all records with a salary field were masked and replaced with a default value.

Manage JSON payloads in Kafka messages.


Sometimes, there are situations where your kafka message contains fields that are themselves JSON payloads encoded as strings. This can happen, for example, when using a DynamoDB source connector. This connector will embed the Dynamodb row into the Document field of the message.

The challenge comes when trying to mask out PII field from a JSON message embedded in a single field of a Kafka message.

For this you can use the mask-json-field transform. This transform allows you to mask out fields at any level of the JSON message. It even supports arrays.

For eg, given the input record:


And the kafka connect SMT config:

The output will be:


In the output, the salary field was replaced with Zero, and the SSN field was replaced with "xxx-xx-xxxx".

Conclusion

In this article, I showed how to use the Kafka Connect Single Message transforms to drop messages with PII field, or to mask PII fields in messages.

You can use these transforms to ensure the data flowing into or out of your kafka topics is compliant with data regulations.

Thursday, June 6, 2024

Textual description of firstImageUrl

Authoring Avro Enums for Extensibility


Introduction

Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution

In this article, we discuss how to author Avro enums so that data written with the new version of the schema can be read by consumers using the previous version of the schema. In other words, we are authoring the Avro schema so that it is Forward Compatible

Avro Enums


In Avro, you can define enums as follows:

"type": { "name": "color", "type": "enum", "symbols": ["red", "blue", "green"] }


Enums are always strings in Avro.

The apache documentation describes the specification for Enums

Avro Compatibility


Now, we come to the main part of this blog post. What happens if you read data having an enum symbol that is not part of reader schema?

Lets start by defining the first version of the schema in a file called "v1.avsc". This is the reader schema.

{ "name": "v1", "type": "record", "namespace": "com.acme", "doc": "Enum test", "fields": [ { "name": "color", "type": { "name": "enum", "type": "enum", "symbols": [ "unknown", "red", "blue", "green" ], "default": "unknown" }, "default": "unknown" } ] }

We have the schema. How do we generate data ? For that we need avro-tools jar. Download avro-tools and store it to a local folder.

Now, you can generate random data as follows. First, write a json message corresponding to the schema.

Write the following into a file called "v1.json"
{ "color": "red" }

Now,  convert the JSON to avro using avro-tools

java -jar ~/DevTools/avro-tools-1.11.1.jar fromjson --schema-file v1.avsc v1.json > v1.avro


Now, we evolve the schema. Add another symbol - "yellow" to the "color" enum. Store it in a file called v2.avsc

{ "name": "v1", "type": "record", "namespace": "com.acme", "doc": "Enum test", "fields": [ { "name": "color", "type": { "name": "enum", "type": "enum", "symbols": [ "unknown", "red", "blue", "green", "yellow" ], "default": "unknown" }, "default": "unknown" } ] }

Create a new JSON message in file v2.json that uses the new enum value

{ "color": "yellow" }

Lets convert this to avro.

java -jar ~/DevTools/avro-tools-1.11.1.jar fromjson --schema-file v2.avsc v2.json > v2.avro

Now lets see what happens if you try to read the v2.avro file using v1 schema ( v1.avsc ). Remember that the v1 schema does not have the symbol "yellow" in the enum.

We will use the toJson command from avro-tools to convert the AVRO to json.

$ java -jar ~/DevTools/avro-tools-1.11.1.jar tojson --reader-schema-file v1.avsc v2.avro {"reason":"unknown"}

As you can see, trying to read the v2 avro message using a schema that does not have the new enum symbol causes the new enum to be converted to the default value for that enum.

Now, lets see what happens if we dont have the default value specified in the schema. Lets create a new schema with that specification, in a file called v1-nodefault.avsc

{

"name": "v1", "type": "record", "namespace": "com.acme", "doc": "Enum test", "fields": [ { "name": "color", "type": { "name": "enum", "type": "enum", "symbols": [ "unknown", "red", "blue", "green" ] }, "default": "unknown" } ] } 


Use this schema to read v2.avro file.

java -jar ~/DevTools/avro-tools-1.11.1.jar tojson --reader-schema-file v1-nodefault.avsc v2.avro


This will result in the following exception printed to stdout


Exception in thread "main" org.apache.avro.AvroTypeException: No match for yellow
	at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:269)
	at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:268)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:182)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248)
	at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:98)
	at org.apache.avro.tool.Main.run(Main.java:67)
	at org.apache.avro.tool.Main.main(Main.java:56)

As you can see, removing the default value from the enum in the schema will make data written with new schema incompatible with the previous schema.

Conclusion

Now, we can conclude by stating the best practice for Avro Enums.

Use default value in Avro enums to allow schema evolution while maintaining backward compatibility with data that was written with the previous versions of the schema.

If you liked this article, you might also like the following:

Wednesday, April 17, 2024

Textual description of firstImageUrl

Troubleshooting serialization errors when using Confluent KafkaAvroSerializer

Introduction

Confluent provides on-prem and hosted services of the Apache Kafka ecosystem. As part of this they also provide a custom serializer/deserializer to enable writing and reading messages with schema from a topic.

The KafkaAvroSerializer provides an efficient mechanism for serializing messages to a kafka topic.

The wire format of the KafkaAvroSerializer is described here


Sometimes, the producer of the message is not using these libraries, but still writing messages that adhere to the confluent wire format. The reason for doing this is to make the messages compatible with Confluent so that you can use the KafkaAvroDeserializer. For eg, you might want to write those messages to S3 using the kafka-connect-s3 sink connector.

When the producer is doing custom serialization, but makes an error in encoding, it can result in serialization errors down the line. These kind of errors are difficult to debug. In this blog post, I will describe how you can root cause those errors.

Serialization Errors

Here is an example of an error we noticed in the kafka connect s3 sink connector in our production instance.


Troubleshooting


Start off by downloading the schema that the error message is referring to:


Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 15930

I download the schema for that schemaid as follows:


curl -s https://schema-registry.mydomain.local/schemas/ids/15930 | jq '.schema|fromjson' > schema.avsc

Next, you need to download the failed message from the topic. The topicname/partition/offset are given in the exception message.


  Error encountered in task crm-tag-update-event-prod-s3-sink-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', 
  where consumed record is {topic='crm-tag-update-event-prod-topic', partition=0, offset=52510041, timestamp=1713396004969, timestampType=CreateTime}.


If you dont see the partition/offset, you can enable your sink connector to log that info with the following configs:

errors.log.enabled: true
errors.log.include.messages: true


Now that we have the topic/partition/offset, lets download the offending message from the kafka topic

$ kafka-console-consumer \
--bootstrap-server kafka-cluster.mydomain.local:6030 \
--command-config security.properties \
--topic crm-tag-update-event-prod-topic \
--partition 0 \
--offset 52510041 \
> message_raw.bin


So we have the schema and the offending message. How do we decode them? For that we use avro-tools. You can download avro-tools from the Sonatype Index 

 For this demo, I am using 1.11.1 version. 

 But wait.. the message that we downloaded from the topic is the raw message. This message is encoded with a magic byte and schema-id in the first 5 bytes ( refer to the confluent wire format) So we first need to extract the actual message. 

This should be easy, since we need to remove the 5 byte header in the message. 

$ dd if=message_raw.bin of=message.bin bs=1 skip=5


Now that we have the actual message and the schema, we can use jsontofrag to decode the message. We expect this to fail, since the message schema-id is not matching the message schema. 

$ java -jar /tmp/avro-tools-1.11.1.jar fragtojson --schema-file schema.avsc message.bin


Root Cause 

To get to the root cause, we need to look at the producer.


One of the following is the cause of this:

1) The schema-id is correct for that topic, but the serialized message does not match.

2) The schema-id is wrong, but the serialized message is correct.


To find out which one of these it is, we need to understand the producer.

In our case, what happened was that an error in a Gitlab pipeline caused the wrong job to be deployed to our environment. That job was using the correct schema subject, but was compiled against POJOs that did not match the subject. So it serialized messages using the wrong pojo but the correct schema-id.

For #1, you need to find the actual schema that was used in serialization. Then download that schema and use it to decode the message using avro-tools.

#2 is more difficult since we dont know the correct schema-id. This will require looking at the producer code to figure out what is the correct schema subject/version or schema-id it should have used for serialization.

Conclusion

In this blog post I showed you how to :

1) Read avro messages from a kafka topic.
2) Extract the embedded message from the raw message.
3) Decode it using avro-tools.
4) Troubleshoot serialization errors.

If you liked this blog post, you might also enjoy the following: