Avro Logical Types with Parquet, Hive and Spark
In this post, I am going to talk about the fidelity of types between Avro and big data technologies like Parquet, Hive and Spark.
Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.
We will also include querying that data using Hive and Spark.
Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.
We will also include querying that data using Hive and Spark.
Avro
Apache Avro is a data serialization system. It defines a schema definition language, and a serialization framework to serialize data. It also supports schema evolution.
Avro schemas are defined in JSON. Avro project makes tools available to translate the schema into classes in popular programming languages.
Logical Types
Avro supports logical types. A logical type is defined as a higher level representation for a primitive type.
For eg, a higher level type of UUID could be represented as a primitive type string.
Similarly, a higher level java.time.Instant object could be represented as a primitive type long if the timestamp is written in Epoch Milliseconds.
In Avro schema, a logical type is defined as follows:
{"name": "timestamp_millis","type": {"type": "long","logicalType": "timestamp_millis"},"doc": "An avro timestamp_millis logical type"}
The following table lists the supported logical types, and the corresponding higher level and primitive types they translate to.
Version | Logical Type | Java Type | Schema Spec |
1.9+ | date | java.time.LocalDate | "type": { "type": "int", "logicalType": "date" } |
1.9+ | time-millis | java.time.LocalTime | "type": { "type": "int", "logicalType": "time-millis" } |
1.9+ | time-micros | java.time.LocalTime | "type": { "type": "long", "logicalType": "time-micros" } |
1.9+ | timestamp_millis | java.time.Instant | "type": { "type": "long", "logicalType": "timestamp-millis" } |
1.9+ | timestamp_micros | java.time.Instant | "type": { "type": "long", "logicalType": "timestamp-micros" } |
1.10+ | local_timestamp_millis | java.time.LocalDateTime | "type": { "type": "long", "logicalType": "local-timestamp-millis" } |
1.10+ | local_timestamp_micros | java.time.LocalDateTime | "type": { "type": "long", "logicalType": "local-timestamp-micros" } |
Logical Type Example
A field named event_timestamp with logical-type timestamp_millis field is defined as follows in Avro schema:
"event_timestamp":{"type": {"type": "long","logicalType": "timestamp-millis" }}}
When a Java POJO is generated from this schema, the `event_timestamp` field will look like this:
java.time.Instant event_timestamp;
Instead of:
long event_timestamp
The advantage with Logical Types in Avro is that you can use them to represent higher level types, while the serialization type is one of the mapped primitive type.
Now that we have an understanding of what logical types are and how they work, let's see how they work when used with other systems.
To test this, I am going to use Hive and Spark. We will use Apache Parquet as the file format. Parquet is a very common file format used in Big Data applications. It supports efficient
column level query of data.
Test Bed
In order to test this out, I created a test platform. I used Confluent's Quickstart to setup a local Kafka ecosystem consisting of a Broker and Kafka Connect. An S3 Sink connector is used to archive the messages in the topic to an S3 Path in Parquet format.
Next, I created a Hive table on top of the Parquet data. I am using Amazon Elastic MapReduce for my Hive metastore.
This is the Avro Schema that has all the possible logical types supported by Avro.
The Kafka producer in the java app was configured as follows:
"value.serializer" = io.confluent.kafka.serializers.KafkaAvroSerializer.class
"auto.register.schemas" = "false"
"value.subject.name.strategy" = io.confluent.kafka.serializers.subject.RecordNameStrategy.class
The kafka connect s3 sink was configured as follows.
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.
The Hive Table was created in two steps.
The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe
Next,we create the final table using this temp table as a template.
The following describes the Hive HQL to create the table.
The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe
Next,we create the final table using this temp table as a template.
The following describes the Hive HQL to create the table.
DROP TABLE IF EXISTS tmp.logical_type_test_avro; CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro PARTITIONED BY( `process_date` string, `process_hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic' TBLPROPERTIES {{ avroSchemaLiteral }}; DROP TABLE IF EXISTS warehouse.logical_type_test; CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic' TBLPROPERTIES {{ tblProperties }}; DROP TABLE tmp.logical_type_test_avro;
Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.
Parquet File Schema
Lets look at the schema of the Parquet file:
message com.ferozed.LogicalType { required binary id (STRING); optional binary environment (STRING); optional binary sender (STRING); optional int64 send_epoch_time_millis; required int32 logical_date (DATE); required int32 logical_time_millis (TIME(MILLIS,true)); required int64 logical_time_micros; required int64 logical_timestamp_millis (TIMESTAMP(MILLIS,true)); required int64 logical_timestamp_micros; required int64 logical_local_timestamp_millis; required int64 logical_local_timestamp_micros; }
Now let us look at the actual schema of the hive table.
Name | Type |
id | string |
environment | string |
sender | string |
send_epoch_time_millis | bigint |
logical_date | date |
logical_time_millis | int |
logical_time_micros | bigint |
logical_timestamp_millis | timestamp |
logical_timestamp_micros | bigint |
logical_local_timestamp_millis | bigint |
logical_local_timestamp_micros | bigint |
The kafka producer wrote an Avro message into the topic using the provided schema.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.
Field | Avro | Parquet | Hive |
logical_date | date | DATE | date |
logical_time_millis | time-millis | (TIME(MILLIS,true)) | int |
logical_time_micros | time-micros | int64 | bigint |
logical_timestamp_millis | timestamp-millis | (TIMESTAMP(MILLIS,true)) | timestamp |
logical_timestamp_micros | timestamp-micros | int64 | bigint |
logical_local_timestamp_millis | local-timestamp-millis | int64 | bigint |
logical_local_timestamp_micros | local-timestamp-micros | int64 | bigint |
Lets see if we can query the data. I am using the Hive CLI that is part of Amazon EMR.
$ select * from warehouse.logical_type_test limit 1;Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
Next, Instead of querying all columns, I tried to query each column individually to see which one was having the problem. It turned out that the logical_timestamp_millis column was the one that was failing because it was typed in Hive as a
timestamp.
The other columns were querying fine.
Field | Avro Type | Hive Type | Query Result Type |
logical_date | date | DATE | PASS - LogicalType |
logical_time_millis | time-millis | int | Fail - int |
logical_time_micros | time-micros | bigint | Fail - bigint |
logical_timestamp_millis | timestamp-millis | timestamp | Fail - Mapping Error |
logical_timestamp_micros | timestamp-micros | bigint | Fail - bigint |
logical_local_timestamp_millis | local-timestamp-millis | bigint | Fail - bigint |
logical_local_timestamp_micros | local-timestamp-micros | bigint | Fail - bigint |
Summary of results with Hive
From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.
The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.
The rest of the logical types did not translate at all.
Summary of results with Parquet
Only the following logical types got translated to higher level parquet types
The rest of the logical types did not translate to higher level parquet types.
Results with Spark
I tried to use Spark to read the S3 files.
Spark failed with the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS); at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)
Summary
The AVRO logical types did not translate fully into Hive and Parquet.
If you are using Kafka for micro-service communication, it should be fine to use all these logical types.
But if the kafka data is going to be archived, be careful of these differences.