A powerful Big Data trio: Spark, Parquet and Avro

Note: A cleaner, more efficient way to handle Avro objects in Spark can be seen in this gist


I love open-source projects that play nicely with others; no one likes to be locked into a single data processing framework or programming language. Mature open-source projects build software with integration and openness in mind to allow engineers to attack Big Data problems from a number of different angles using the most appropriate tool for the job. This post explains how to combine Spark, Parquet and Avro to create a fast, flexible and scalable data analysis system.

If you already know what Spark, Parquet and Avro are, you can skip the blockquotes in this section or just jump ahead to the sample application in the next section.

Spark is an open source cluster computing system that aims to make data analytics fast — both fast to run and fast to write.

To run programs faster, Spark provides primitives for in-memory cluster computing: your job can load data into memory and query it repeatedly much more quickly than with disk-based systems like Hadoop MapReduce.

To make programming faster, Spark provides clean, concise APIs in Scala, Java and Python.

Spark can read/write data to Apache Hadoop using Hadoop {Input,Output}Formats. This post will show you how to use the Parquet {Input,Output}Formats to create and read Parquet files using Spark. Spark is often an order(s) of magnitude faster than Hadoop for Map-Reduce jobs.

Parquet is a columnar storage format for Hadoop.

We created Parquet to make the advantages of compressed, efficient columnar data representation available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model, or programming language.

Parquet is a format that can be processed by a number of different systems: Shark, Impala, Hive, Pig, Scrooge and others. It also doesn’t lock you into a specific programming language since the format is defined using Thrift which supports numerous programming languages. For example, Impala is written in C++ while Hive is written in Java but they can easily interoperate on the same Parquet data.

Apache Avro is a data serialization system with rich data structures and a compact, fast, binary data format. With Avro, you can define a data schema and then read and write data in that format using a number of different programming languages. Avro is similar to Apache Thrift and Google Protobuf.

Example Application using Spark, Parquet and Avro

Let’s go through a sample application which uses Spark, Parquet and Avro to read, write and filter a sample amino acid dataset. This sample application isn’t meant to do anything useful but show how these systems can be used together. The application writes the essential amino acids to a Parquet file, reads them all back and then re-reads them filtering out the amino acids that are not basic, all using Spark.

For the impatient

You can run the application using the following commands (assuming you have a recent version of Maven installed):

$ git clone https://github.com/massie/spark-parquet-example
$ cd spark-parquet-example
$ mvn scala:run -DmainClass=com.zenfractal.SparkParquetExample

Maven will generate Java code from the Avro schema, compile and run the sample application. Once you’re convinced that the application works, you can take a peek at the main Scala code. There’s really isn’t much else to the app.

For the detail-oriented

To add Spark, Parquet and Avro to your project, use the following Maven dependencies

        <dependency>
            <groupId>org.spark-project</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>0.7.3</version>
        </dependency>
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

The example application also uses the avro-maven-plugin to convert the Avro IDL into Java code. See the application pom.xml file for details.

The Avro schema explicitly describe the format of the data this sample application reads and writes.

enum AminoAcidType {
	ALIPHATIC,
	HYDROXYL,
	CYCLIC,
	AROMATIC,
	BASIC,
	ACIDIC
}

record AminoAcid {
 	AminoAcidType type;
	string fullName;
	string abbreviation;
	float molecularWeight;
}

Note: If you keep the schema flat (without nesting), the Parquet files you create can be read by systems like Shark and Impala. These systems allow you to query Parquet files as tables using SQL-like syntax. The Parquet files created by this sample application could easily be queried using Shark for example.

The main Scala code for the Spark job is short (only 76 lines) with comments that help explain what the application is doing at each step.

package com.zenfractal

import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
import spark.SparkContext
import spark.SparkContext._
import org.apache.hadoop.mapreduce.Job
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport, AvroReadSupport}
import parquet.filter.{RecordFilter, UnboundRecordFilter}
import java.lang.Iterable
import parquet.column.ColumnReader
import parquet.filter.ColumnRecordFilter._
import parquet.filter.ColumnPredicates._
import com.google.common.io.Files
import java.io.File

object SparkParquetExample {

  // This predicate will remove all amino acids that are not basic
  class BasicAminoAcidPredicate extends UnboundRecordFilter {
    def bind(readers: Iterable[ColumnReader]): RecordFilter = {
      column("type", equalTo(AminoAcidType.BASIC)).bind(readers)
    }
  }

  // Only prints non-null amino acids
  private def aminoAcidPrinter(tuple: Tuple2[Void, AminoAcid]) = {
    if (tuple._2 != null) println(tuple._2)
  }

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "ParquetExample")
    val job = new Job()

    val tempDir = Files.createTempDir()
    val outputDir = new File(tempDir, "output").getAbsolutePath
    println(outputDir)

    val essentialAminoAcids = List(
      new AminoAcid(AminoAcidType.BASIC, "histidine", "his", 155.16f),
      new AminoAcid(AminoAcidType.ALIPHATIC, "isoleucine", "ile", 131.18f),
      new AminoAcid(AminoAcidType.ALIPHATIC, "leucine", "leu", 131.18f),
      new AminoAcid(AminoAcidType.BASIC, "lysine", "lys", 146.19f),
      new AminoAcid(AminoAcidType.HYDROXYL, "methionine", "met", 149.21f),
      new AminoAcid(AminoAcidType.AROMATIC, "phenylalanine", "phe", 165.19f),
      new AminoAcid(AminoAcidType.HYDROXYL, "threonine", "thr", 119.12f),
      new AminoAcid(AminoAcidType.AROMATIC, "tryptophan", "trp", 204.23f),
      new AminoAcid(AminoAcidType.ALIPHATIC, "valine", "val", 117.15f))


    // Configure the ParquetOutputFormat to use Avro as the serialization format
    ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
    // You need to pass the schema to AvroParquet when you are writing objects but not when you
    // are reading them. The schema is saved in Parquet file for future readers to use.
    AvroParquetOutputFormat.setSchema(job, AminoAcid.SCHEMA$)
    // Create a PairRDD with all keys set to null and wrap each amino acid in serializable objects
    val rdd = sc.makeRDD(essentialAminoAcids.map(acid => (null, new SerializableAminoAcid(acid))))
    // Save the RDD to a Parquet file in our temporary output directory
    rdd.saveAsNewAPIHadoopFile(outputDir, classOf[Void], classOf[AminoAcid],
      classOf[ParquetOutputFormat[AminoAcid]], job.getConfiguration)

    // Read all the amino acids back to show that they were all saved to the Parquet file
    ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[AminoAcid]])
    val file = sc.newAPIHadoopFile(outputDir, classOf[ParquetInputFormat[AminoAcid]],
      classOf[Void], classOf[AminoAcid], job.getConfiguration)
    file.foreach(aminoAcidPrinter)

    // Set a predicate and Parquet only deserializes amino acids that are basic.
    // Non-basic amino acids will returned as null.
    ParquetInputFormat.setUnboundRecordFilter(job, classOf[BasicAminoAcidPredicate])
    val filteredFile = sc.newAPIHadoopFile(outputDir, classOf[ParquetInputFormat[AminoAcid]],
      classOf[Void], classOf[AminoAcid], job.getConfiguration)
    filteredFile.foreach(aminoAcidPrinter)
  }


}

Typically, one of the most expensive parts of reading and writing data is (de)serialization. Parquet supports predicate push-down and schema projection to target specific columns in your data for filtering and reading – keeping the cost of deserialization to a minimum.

For example, in the code above, the BasicAminoAcidPredicate class is used to filter out all but the basic amino acids. This predicate examines the amino acid type field to see if it’s set to AminoAcidType.BASIC. When Parquet processes each record with this predicate, it will deserialize only the type field to check the record.

If the BasicAminoAcidPredicate returns false, the record is skipped and no further deserialization is performed. If the predicate returns true, either the entire record is deserialized or, in the case where a projection schema is defined, only the fields in the projection are deserialized. To set a projection schema, use the AvroParquetOutputFormat.setSchema() method.

I hope you found this post useful. Feel free to leave a comment.

comments powered by Disqus