Creating Hadoop and Impala friendly partitioned data with Kite SDK

When working with Hadoop and SQL-On-Hadoop systems like Impala, we have to think about couple of important factors – how to serialize data for storage and processing and how to partition the data.
Majority of Hadoop practitioner now agree that most flexible and performant would be combination of Avro and Parquet formats. So let’s dive into some details:

Data Serialization with Parquet and Avro.

Apache Avro is a language-neutral data serialization system. The project was created by Doug Cutting (the creator of Hadoop) to address the major downside of Hadoop Writables: lack of language portability. Having a data format that can be processed by many languages (currently C, C++, C#, Java, PHP, Python, and Ruby) makes it easier to share datasets with a wider audience than one tied to a single language. Avro data is described using a language independent schema. Since Avro stores the schema in the header of each file, it’s self-describing and Avro files can easily be read later, even from a different language than the one used to write the file. Avro also provides better native support for MapReduce since Avro datafiles are compressible and splittable.  Another important feature of Avro is support for schema evolution—the schema used to read a file does not need to match the schema used to write the file. This makes it possible to add new fields to a schema as requirements change.

Avro files are splittable and Avro supports Snappy and Deflate compression. Avro schemas are usually written in JSON.

In summary, Avro provides us with following benefits:

  • Language portability
  • Schema evolution
  • Compressible and Splittable
  • Supported by most all Hadoop ecosystems components.

As we see, Avro format is quite powerful. It’s good if your organization can set Avro as a standard for moving data around, to make everybody life easier. I know that LinkedIn, Etsy choose Avro as a their standard.

But Avro is record oriented format. Recently, a number of databases ( for example, Vertica) have introduced columnar storage, which provides several benefits over earlier row-oriented systems:

  • Skips I/O and decompression (if applicable) on columns that are not a part of the query.
  • Works well for queries that only access a small subset of columns. If many columns are being accessed, then row-oriented is generally preferable.
  • Provides efficient compression; compression can be specified on a per-column level.
  • Compression on columns is generally very efficient because entropy within a column is lower than entropy within a block of rows.
  • Columnar storage is often well suited for data-warehousing type applications where users want to aggregate certain columns over a large collection of records.

One of the most popular columnar format is Parquet, developed and open sourced by Twitter.  Parquet stores full metadata at the end of files, so Parquet files are self-documenting.

To summarize,  Avro is a row-based storage format for Hadoop. Parquet is a column-based storage format for Hadoop.

Which one to choose?

If your use case typically scans or retrieves all of the fields in a row in each query, Avro is usually the best choice.  If your dataset has many columns, and your use case typically involves working with a subset of those columns rather than entire records, Parquet is optimized for that kind of work. But the best part is the ability to store Avro files in Parquet format – have the cake and eat it too.

Data Partitioning.

More often than not, data got queried in specific pattern. For example, data may be  queried  by some data range. For Hadoop and Impala to make processing faster, we want to partition data. Partitioning of a data set is a very common technique used to reduce the amount of I/O required when processing the data set. When dealing with large amounts of data, the savings brought by reducing I/O can be quite significant. Unlike traditional data warehouses, however, HDFS doesn’t store indexes on the data. This lack of indexes plays a large role in speeding up data ingest, however this means that every query will have to read the entire data set even when processing a small subset of the data (a pattern called “full table scan”). When the data sets grow very big, and queries only require access to subsets of data, a very good solution is to break up the data set into smaller sets, each such subset being called a partition.

Best practice partitioned by date layout:

datasets/
└── ratings/
└── year=2015/
├── month=01/
│   ├── day=01/
│   ├── …
│   └── day=30/
├── month=02/
│   ├── day=01/
│   ├── …

Our end goal is to write partitioned data and encode it with Parquet/Avro and compress with Snappy or any other compression algorithms. Sould like a lot of work but fear not with Kite SDK. The goal of Kite SDK is to make using best practices of Hadoop easier to code.

Introducing Kite SDK – A data API for Hadoop:

The goal of Kite SDK (formerly Cloudera SDK) : “Hadoop is daunting from the perspective of a developer who just needs to get some work done. You should be able to ramp up quickly on a few key technologies or concepts, rather than first learn everything about the 13-14 different technologies required to use Hadoop effectively Things should just work together. Hadoop forces you to spend more time thinking about infrastructure than your business goals… Kite provides additional support for this infrastructure one level up in the stack so that they are codified in APIs that make sense to developers.”

Kite is immediately accessible to developers who speak Java. Other languages will follow. By using Kite API and also Kite CLI it is extremely easy to create data set that meet Hadoop best practices.

For our task at hand all we have to do is following:

    URI schemaURI = URI.create("resource:simple-log.avsc");
// create a Parquet dataset for long-term storage
Dataset<Record> partitionedData = Datasets.create("dataset:file:/tmp/data/logs",
                new DatasetDescriptor.Builder()
                        .format(Formats.PARQUET)
                        .schemaUri(schemaURI)
                        .compressionType(CompressionType.Snappy)
                        .partitionStrategy(new PartitionStrategy.Builder()
                                .year("timestamp", "year")
                                .month("timestamp", "month")
                                .day("timestamp", "day")
                                .build())
                        .build(), Record.class);
}

As you can see, using Java fluent API and builder pattern Kite SDK allow us to describe our data set and how we want to store it.  Then all we have to do is to write the data into this dataset.

DatasetWriter<UCCRecord> writer = partitionedData.newWriter();
        try {
            writer = partitionedData.newWriter();
                // get your data here 
                .........
            // write it 
                writer.write(record);
            }

            writer.flush();

As you can see, Kite make writing Avro/Parquet serialized and partitioned data simple.  I hope this will spark your interest in Kite. I should note that Kite project has a very good documentation and examples. Please follow the links to learn more.

Submit a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>