Saturday, December 4, 2021

Textual description of firstImageUrl

Avro Logical Types with Parquet, Hive and Spark

Avro Logical Types with Parquet, Hive and Spark



In this post, I am going to talk about the fidelity of types between Avro and big data technologies like Parquet, Hive and Spark.

Specifically, we will discuss which logical types maintain fidelity when the data is eventually written as parquet.

We will also include querying that data using Hive and Spark.

Avro







Apache Avro  is a data serialization system. It defines a schema definition language, and a serialization framework to serialize data. It also supports schema evolution.

Avro schemas are defined in JSON. Avro project makes tools available to translate the schema into classes in popular programming languages.


Logical Types



Avro supports logical types. A logical type is defined as a higher level representation for a primitive type. 

For eg, a higher level type of UUID could be represented as a primitive type string.

Similarly, a higher level java.time.Instant object could be represented as a primitive type long if the timestamp is written in Epoch Milliseconds.

In Avro schema, a logical type is defined as follows:

    {
      "name": "timestamp_millis",
      "type": { 
        "type": "long", 
        "logicalType": "timestamp_millis" 
      },
      "doc": "An avro timestamp_millis logical type"
    }

The following table lists the supported logical types, and the corresponding higher level and primitive types they translate to.


Version Logical Type Java Type Schema Spec
1.9+ date java.time.LocalDate "type": { "type": "int", "logicalType": "date" }
1.9+ time-millis java.time.LocalTime "type": { "type": "int", "logicalType": "time-millis" }
1.9+ time-micros java.time.LocalTime "type": { "type": "long", "logicalType": "time-micros" }
1.9+ timestamp_millis java.time.Instant "type": { "type": "long", "logicalType": "timestamp-millis" }
1.9+ timestamp_micros java.time.Instant "type": { "type": "long", "logicalType": "timestamp-micros" }
1.10+ local_timestamp_millis java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-millis" }
1.10+ local_timestamp_micros java.time.LocalDateTime "type": { "type": "long", "logicalType": "local-timestamp-micros" }



Logical Type Example



A field named event_timestamp with logical-type timestamp_millis field is defined as follows in Avro schema:

"event_timestamp":
{
    "type": { 
              "type": "long", 
              "logicalType": "timestamp-millis" }
    }
}

When a Java POJO is generated from this schema, the `event_timestamp` field will look like this:

java.time.Instant event_timestamp;

Instead of:

long event_timestamp

The advantage with Logical Types in Avro is that you can use them to represent higher level types, while the serialization type is one of the mapped primitive type.


Now that we have an understanding of what logical types are and how they work, let's see how they work when used with other systems.

To test this, I am going to use Hive and Spark. We will use Apache Parquet as the file format. Parquet is a very common file format used in Big Data applications. It supports efficient column level query of data.

Test Bed 


In order to test this out, I created a test platform. I used Confluent's Quickstart to setup a local Kafka ecosystem consisting of a Broker and Kafka Connect. An S3 Sink connector is used to archive the messages in the topic to an S3 Path in Parquet format.

Next, I created a Hive table on top of the Parquet data. I am using Amazon Elastic MapReduce for my Hive metastore.



This is the Avro Schema that has all the possible logical types supported by Avro.



The producer app was written in Java. I generated a POJO (Plain Old Java Object) from this schema using Avro Tools, and used the generated class to send random data into the topic.

The Kafka producer in the java app was configured as follows:

"value.serializer" = io.confluent.kafka.serializers.KafkaAvroSerializer.class
"auto.register.schemas" = "false"
"value.subject.name.strategy" = io.confluent.kafka.serializers.subject.RecordNameStrategy.class
The kafka connect s3 sink was configured as follows.

"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",

The KafkaAvroSerializer provides an efficient way to transfer Avro data over Kafka topics.

The Hive Table was created in two steps.

The first step was to create a temporary hive table using the provided Avro schema and org.apache.hadoop.hive.serde2.avro.AvroSerDe

Next,we create the final table using this temp table as a template.

The following describes the Hive HQL to create the table.
    DROP TABLE IF EXISTS tmp.logical_type_test_avro;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS tmp.logical_type_test_avro
    PARTITIONED BY( `process_date` string, `process_hour` string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ avroSchemaLiteral }};
    
    DROP TABLE IF EXISTS warehouse.logical_type_test;
    
    CREATE EXTERNAL TABLE IF NOT EXISTS warehouse.logical_type_test like tmp.logical_type_test_avro
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS PARQUET LOCATION 's3://my-bucket/logical-type-test/test/logical-type-test-test-topic'
    TBLPROPERTIES {{ tblProperties }};
    
    DROP TABLE tmp.logical_type_test_avro;
    


Here is the output of the "SHOW CREATE TABLE warehouse.logical_type_test;" Command.

Parquet File Schema


Lets look at the schema of the Parquet file:

message com.ferozed.LogicalType {
  required binary id (STRING);
  optional binary environment (STRING);
  optional binary sender (STRING);
  optional int64 send_epoch_time_millis;
  required int32 logical_date (DATE);
  required int32 logical_time_millis (TIME(MILLIS,true));
  required int64 logical_time_micros;
  required int64 logical_timestamp_millis (TIMESTAMP(MILLIS,true));
  required int64 logical_timestamp_micros;
  required int64 logical_local_timestamp_millis;
  required int64 logical_local_timestamp_micros;
}

Now let us look at the actual schema of the hive table.

Name Type
id string
environment string
sender string
send_epoch_time_millis bigint
logical_date date
logical_time_millis int
logical_time_micros bigint
logical_timestamp_millis timestamp
logical_timestamp_micros bigint
logical_local_timestamp_millis bigint
logical_local_timestamp_micros bigint

The kafka producer wrote an Avro message into the topic using the provided schema.
Then the Kafka connect S3 Sink read those messages and converted them to Parquet.
Here is how the types got translated.

Field Avro Parquet Hive
logical_date date DATE date
logical_time_millis time-millis (TIME(MILLIS,true)) int
logical_time_micros time-micros int64 bigint
logical_timestamp_millis timestamp-millis (TIMESTAMP(MILLIS,true)) timestamp
logical_timestamp_micros timestamp-micros int64 bigint
logical_local_timestamp_millis local-timestamp-millis int64 bigint
logical_local_timestamp_micros local-timestamp-micros int64 bigint

Lets see if we can query the data. I am using the Hive CLI that is part of Amazon EMR.

$ select * from warehouse.logical_type_test limit 1;

Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable

Next, Instead of querying all columns, I tried to query each column individually to see which one was having the problem. It turned out that the logical_timestamp_millis column was the one that was failing because it was typed in Hive as a timestamp.

The other columns were querying fine.

Here is a table summarizing the fidelity of Avro logical-type fields to Hive tables.


Field Avro Type Hive Type Query Result Type
logical_date date DATE PASS - LogicalType
logical_time_millis time-millis int Fail - int
logical_time_micros time-micros bigint Fail - bigint
logical_timestamp_millis timestamp-millis timestamp Fail - Mapping Error
logical_timestamp_micros timestamp-micros bigint Fail - bigint
logical_local_timestamp_millis local-timestamp-millis bigint Fail - bigint
logical_local_timestamp_micros local-timestamp-micros bigint Fail - bigint

Summary of results with Hive



From my result, only the Avro logical-type Date translated correct and was successsful in Hive query.

The logical type logical-timestamp-millis translated to a Hive timestamp type, but failed to query.

The rest of the logical types did not translate at all.

Summary of results with Parquet



Only the following logical types got translated to higher level parquet types

  • logical-date
  • logical-time-millis
  • logical-timestamp-millis


  • The rest of the logical types did not translate to higher level parquet types.

    Results with Spark



    I tried to use Spark to read the S3 files.

    Spark failed with the following error:

    py4j.protocol.Py4JJavaError: An error occurred while calling o65.parquet.
        : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-100-106-230-65.us-west-2.compute.internal, executor 1): org.apache.spark.sql.AnalysisException: Parquet type not yet supported: INT32 (TIME_MILLIS);
        	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotImplemented$1(ParquetSchemaConverter.scala:105)


    Summary



    The AVRO logical types did not translate fully into Hive and Parquet.

    If you are using Kafka for micro-service communication, it should be fine to use all these logical types.

    But if the kafka data is going to be archived, be careful of these differences.