Saturday, June 24, 2023

Textual description of firstImageUrl

mask-json-field transform - a Kafka Connect SMT to mask out PII data from JSON fields in Kafka messages.


Introduction


In today's fast-paced digital era, where vast amounts of data are being generated and processed at an unprecedented scale, the need for efficient, scalable, and real-time data processing solutions has become crucial. Traditional messaging systems and data integration tools often struggle to meet these demands, leading to delays, data loss, and inefficiencies in data processing pipelines. That's where Apache Kafka, a powerful distributed streaming platform, comes into play.

Kafka has emerged as a game-changer in the world of data streaming, enabling organizations to build robust, fault-tolerant, and highly scalable data pipelines. Developed by the Apache Software Foundation, Kafka was originally created to handle the real-time data ingestion and processing needs of LinkedIn. Since its inception, Kafka has gained immense popularity and has become the de facto standard for building event-driven architectures in a wide range of industries.

Kafka Connect plays a pivotal role in the Kafka ecosystem by providing a scalable and reliable framework for data integration. It simplifies the process of connecting external systems to Kafka, enabling seamless data ingestion and extraction. Kafka Connect acts as a bridge between Kafka topics and various data sources or sinks, allowing for the efficient transfer of data in real-time. It offers a wide range of connectors that serve as plugins to connect to different systems such as databases, file systems, message queues, and cloud services. With Kafka Connect, organizations can easily integrate their existing data infrastructure with Kafka, enabling them to build robust and flexible data pipelines. The ability to leverage pre-built connectors or develop custom ones makes Kafka Connect a valuable tool for simplifying data integration tasks and accelerating the adoption of Kafka within an organization's data ecosystem.

Kafka Connect Single Message Transforms


Single Message Transforms (SMT) is a powerful feature in Kafka Connect that allows for in-flight data transformation and enrichment within the data integration pipelines. SMT operates on individual messages as they pass through the connectors, providing the flexibility to modify, filter, or enhance the data without the need for external processing. With SMT, you can apply a variety of operations such as adding or removing fields, changing data types, filtering messages based on specific conditions, and performing calculations or aggregations. This capability enables organizations to shape the data precisely according to their requirements before it reaches the destination, ensuring data consistency, compliance, and compatibility. 

Complying with Data Regulations


Companies need to comply with regulations regarding data protection in the jurisdiction they are part of. Examples of these regulations inlcude CCPA/CPRA ( Califormia ), GLBA ( US / Financial ). Other countries have their own version of these.

Companies using Kafka ecosystem of services to transfer data need to be cognizant of these regulations and figure out where regulated data ( eg PII ) needs to be visible or not. When using kafka-connect as the integration mechanism, you can use SMTs to mask out data to comply with regulations.

In this blog post, I will introduce the Kafka Connect mask-json-field SMT.

mask-json-field Single Message Transform


There are cases where you need to mask a value in a JSON document, that is embedded in a field of a Kafka message. For eg, assume the following kafka message schema, that could be the output of a Change Data Capture (CDC) connector that is monitoring a database and writing CDC events into a kafka topic.



Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 100000, "ssn": "111-22-3333"}

In this kafka message, the Document value is a String that is a JSON payload of the Database row.

Assume we are persisting this message into a different system using a Sink connector. However, due to privacy, you dont want to persist the salary and ssn.

The JSON version of this message looks as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 100000, \"ssn\": \"111-22-3333\"}"
}


The mask-json-field SMT allows you to manipulate JSON documents embedded inside kafka messages.

Here is an example of using the transform in a FileStreamSink connector.


    
{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_ssn,mask_salary",
    "transforms.mask_ssn.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn.REPLACEMENT_FIELD_PATH": "ssn",
    "transforms.mask_ssn.REPLACEMENT_FIELD_VALUE_STRING": "xxx-xx-xxxx",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,

    "errors.tolerance": "all"

}



With this transform, the output message will be:
 
Field Name Field Value
Id 1
Database contoso
Operation Insert
Document {"name": "john doe", "salary": 0, "ssn": "xxx-xx-xxxx"}

The raw version of the output will be as follows:
{
    "Id": 1,
    "Database": "contoso",
    "Operation": "Insert",
    "Document": "{\"name\": \"john doe\", \"salary\": 0, \"ssn\": \"xxx-xx-xxxx\"}"
}


This tranform also works on arrays. For eg, assume you have a SSN value as an array of strings

["111", "22", "3333" ]

and you want to transform it to:

["xxx", "xx", "3333" ]

Then the connector spec will look like this:



{
    "name": "json-mask-test",
    "connector.class": "FileStreamSink",
    "file": "/tmp/file-sink.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": 1,
    "topics": "json-topic",
    "transforms": "mask_salary,mask_ssn_1,mask_ssn_2",
    "transforms.mask_salary.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_salary.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_salary.REPLACEMENT_FIELD_PATH": "salary",
    "transforms.mask_salary.REPLACEMENT_VALUE_INT": 0,
    "transforms.mask_ssn_1.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_1.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_1.REPLACEMENT_FIELD_PATH": "/ssn/0",
    "transforms.mask_ssn_1.REPLACEMENT_VALUE_STRING": "xxx",
    "transforms.mask_ssn_2.type": "io.github.ferozed.kafka.connect.transforms.MaskJsonField$Value",
    "transforms.mask_ssn_2.CONNECT_FIELD_NAME": "Document",
    "transforms.mask_ssn_2.REPLACEMENT_FIELD_PATH": "/ssn/1",
    "transforms.mask_ssn_2.REPLACEMENT_VALUE_STRING": "xx",

    "errors.tolerance": "all"

}


There are many more feature that this transform supports. Look at the README.md of the source.

Conclusion


Kafka Connect and its Single Message Transforms feature provide a robust and scalable framework for data integration within the Kafka ecosystem. The mask-json-field SMT provides organizations with an extensible mechanism to mask out sensitive fields from the data, allowing compliance with various data protection regulations.


If you liked this post, here are some others you might like:

2 comments :

  1. This is awesome! Will it work with sink connectors?

    ReplyDelete
    Replies
    1. Thanks for your feedback! yes, this SMT can be used from both source and sink connectors.

      Delete