Avro Logical Types with Parquet, Hive and Spark
Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.
We will also include querying that data using Hive and Spark.
Avro
Logical Types
{"name": "timestamp_millis","type": {"type": "long","logicalType": "timestamp_millis"},"doc": "An avro timestamp_millis logical type"}
Version | Logical Type | Java Type | Schema Spec |
1.9+ | date | java.time.LocalDate | "type": { "type": "int", "logicalType": "date" } |
1.9+ | time-millis | java.time.LocalTime | "type": { "type": "int", "logicalType": "time-millis" } |
1.9+ | time-micros | java.time.LocalTime | "type": { "type": "long", "logicalType": "time-micros" } |
1.9+ | timestamp_millis | java.time.Instant | "type": { "type": "long", "logicalType": "timestamp-millis" } |
1.9+ | timestamp_micros | java.time.Instant | "type": { "type": "long", "logicalType": "timestamp-micros" } |
1.10+ | local_timestamp_millis | java.time.LocalDateTime | "type": { "type": "long", "logicalType": "local-timestamp-millis" } |
1.10+ | local_timestamp_micros | java.time.LocalDateTime | "type": { "type": "long", "logicalType": "local-timestamp-micros" } |
Logical Type Example
"event_timestamp":{"type": {"type": "long","logicalType": "timestamp-millis" }}}
java.time.Instant event_timestamp;
long event_timestamp
Test Bed
"value.serializer" = io.confluent.kafka.serializers.KafkaAvroSerializer.class
"auto.register.schemas" = "false"
"value.subject.name.strategy" = io.confluent.kafka.serializers.subject.RecordNameStrategy.class
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.
The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe
Next,we create the final table using this temp table as a template.
The following describes the Hive HQL to create the table.
DROP TABLE IF EXISTS tmp.logical_type_test_avro;
CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro
PARTITIONED BY( `process_date` string, `process_hour` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
TBLPROPERTIES {{ avroSchemaLiteral }};
DROP TABLE IF EXISTS warehouse.logical_type_test;
CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
TBLPROPERTIES {{ tblProperties }};
DROP TABLE tmp.logical_type_test_avro;
Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.
Parquet File Schema
message com.ferozed.LogicalType { required binary id (STRING); optional binary environment (STRING); optional binary sender (STRING); optional int64 send_epoch_time_millis; required int32 logical_date (DATE); required int32 logical_time_millis (TIME(MILLIS,true)); required int64 logical_time_micros; required int64 logical_timestamp_millis (TIMESTAMP(MILLIS,true)); required int64 logical_timestamp_micros; required int64 logical_local_timestamp_millis; required int64 logical_local_timestamp_micros; }
Name | Type |
id | string |
environment | string |
sender | string |
send_epoch_time_millis | bigint |
logical_date | date |
logical_time_millis | int |
logical_time_micros | bigint |
logical_timestamp_millis | timestamp |
logical_timestamp_micros | bigint |
logical_local_timestamp_millis | bigint |
logical_local_timestamp_micros | bigint |
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.
Field | Avro | Parquet | Hive |
logical_date | date | DATE | date |
logical_time_millis | time-millis | (TIME(MILLIS,true)) | int |
logical_time_micros | time-micros | int64 | bigint |
logical_timestamp_millis | timestamp-millis | (TIMESTAMP(MILLIS,true)) | timestamp |
logical_timestamp_micros | timestamp-micros | int64 | bigint |
logical_local_timestamp_millis | local-timestamp-millis | int64 | bigint |
logical_local_timestamp_micros | local-timestamp-micros | int64 | bigint |
$ select * from warehouse.logical_type_test limit 1;Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
Field | Avro Type | Hive Type | Query Result Type |
logical_date | date | DATE | PASS - LogicalType |
logical_time_millis | time-millis | int | Fail - int |
logical_time_micros | time-micros | bigint | Fail - bigint |
logical_timestamp_millis | timestamp-millis | timestamp | Fail - Mapping Error |
logical_timestamp_micros | timestamp-micros | bigint | Fail - bigint |
logical_local_timestamp_millis | local-timestamp-millis | bigint | Fail - bigint |
logical_local_timestamp_micros | local-timestamp-micros | bigint | Fail - bigint |
Summary of results with Hive
From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.
The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.
The rest of the logical types did not translate at all.
Summary of results with Parquet
Only the following logical types got translated to higher level parquet types
The rest of the logical types did not translate to higher level parquet types.
Results with Spark
py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS); at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)
Summary
The AVRO logical types did not translate fully into Hive and Parquet.
If you are using Kafka for micro-service communication, it should be fine to use all these logical types.
But if the kafka data is going to be archived, be careful of these differences.