Details you need to know about Apache Parquet
Parquet is a columnar file format that supports nested data. Lots of data systems support this data format because of it’s great advantage of performance.
File format
First we should known is that Apache Parquet is a binary encoding like Apache Thrift and Protocol Buffers which are not human-redable, it’s very different from some texual format like JSON, XML and CSV.
In order to identify the beginning and ending of the Parquet file, it use a Magic Number(4 special bytes) as separator. Following the first magic number, there are several Row Groups and then Footer. FileMetaData is placed in Footer, because metadata is written after the data is written. Row Groups are about datas.
There are three types of metadata: file metadata, column (chunk) metadata and page header metadata. All thrift structures are serialized using the TCompactProtocol.
ThriftCompactProtocol is another binary encoding from Apache Thrift project.
Some important conceptions listed below.
Block (hdfs block): This means a block in hdfs and the meaning is unchanged for describing this file format. The file format is designed to work well on top of hdfs.
File: A hdfs file that must include the metadata for the file. It does not need to actually contain the data.
Row group: A logical horizontal partitioning of the data into rows. There is no physical structure that is guaranteed for a row group. A row group consists of a column chunk for each column in the dataset.
Column chunk: A chunk of the data for a particular column. These live in a particular row group and is guaranteed to be contiguous in the file.
Page: Column chunks are divided up into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). There can be multiple page types which is interleaved in a column chunk.
Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.
Metadata
Let’s deep into the metadata.
The definition file: Parquet thrift definition
FileMetaData
Here is the definition:
1 | /** |
The field ‘num_rows’ is very useful when data reader wanna to count the data, for instance, when SparkSQL count on some paritioned table, Spark just sum all the ‘num_rows’ of each parquet file belong to those filtered partitions.
The ‘schema’ is the most important part of this metadata, it is defined by a list of SchemaElement, that’s means each field is represented by a SchemaElement.
SchemaElement
Here is the structure of SchemaElement.
1 | /** |
The data type of each data column is determined by both ‘type’ and ‘logicalType’ field.
The ‘type’ field support several primitive types:
1 | BOOLEAN: 1 bit boolean |
Logical types are some type based on primitive type, the field ‘logicalType’ tells data reader to which LogicalType should the primitive type data be transfered.
Logical types are used to extend the types that parquet can be used to store, by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses parquet’s efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation.
Many types supported by ‘logicalType’. Such as:
1 | STRING,ENUM,UUID, |
INT annotation can be used to specify the maximum number of bits in the stored value. The annotation has two parameter: bit width and sign, such as:INT(8, true), INT(16, true), INT(32, true), INT(64, true).
For more details about LogicalType see Parquet Logical Type Definitions
ColumnMetaData
1 | /** |
Each ColumnChunk has only one ColumnMetaData, so one ColumnMetaData defined how to store one column’s data in one row group.
Some fields should be noticed:
- encodings: just for validation, each column’s encoding is defined in DataPageHeader(see below);
- codec: the compression algorithm used, such as SNAPPY, GZIP, LZO and so on;
- statistics: statistics information for the column of the row group. Some useful field are showed below, they are very useful for distinct counting or filtering. With the ‘max_value’ and ‘min_value’, SparkSQL can do filter push-down by skipping some row groups.
1 | struct Statistics { |
PageHeader
PageHeader is kind of like parent class of DataPageHeader, IndexPageHeader and DictionaryPageHeader, contains some common fields.
Each data page has a DataPageHeader, let’s look into it below.
DataPageHeader
1 | /** |
- encoding: This field is about how to store primitive type data with bytes, for example, a PLAIN encoding means to use 4 bytes, PLAIN_DICTIONARY means to use a dictionary.
For more details see Parquet encoding definitions - statistics: statistics information for this page
ps: The comment of statistics is wrong, it should be ‘optional statistics for the data in this page’, I have fixed that in my PR.
How encoding work with compression ?
First, the data be encoded , second, encoded output is then compressed with a generic compression algorithm specified in ColumnMetaData like Snappy.
There is another description.
Encoding: It’s more at application level where data representation is changed. The encoding can also minimize space usage which can give us a kind of compression.
Compression : In general it’s the Technic to reduce storage for given data in bytes irrespective of underline data is already encoded or not.
Parquet-tools
Parquet-tools is a convenient offical tool to play with Parquet file.
If you use MacOS and homebrew, just install it by brew install parquet-tools
.
To show metadata: parquet-tools meta yourfile.parquet
Or use hadoop distribution: hadoop jar ./parquet-tools-<VERSION>.jar <command>
You’ll get something like this:
1 | creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) |
More details: Parquet Tools
Use Parquet with Spark
You can find some guide here: Spark SQL Guide > Parquet Files.
With Parquet file, spark can do some optimizations.
- Column projection
The idea behind this feature is simple: just read the data for columns that the query needs to process and skip the rest of the data. Column-oriented data formats like Parquet can implement this feature quite naturally.- Predicate push down
Predicate push down is another feature of Spark and Parquet that can improve query performance by reducing the amount of data read from Parquet files. Predicate push down works by evaluating filtering predicates in the query against metadata stored in the Parquet files.
Does predicate push down available for nest field ?
Yes. If the field schema is user(id, name)
, data will be stored in two columns as user.id
and user.name
, then both id and name have thier own statistic info in metadata.
Use Parquet with HDFS
When using Parquet with HDFS, you should care about the row group size.
Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
The row group size is controlled by config named parquet.block.size
, the default value is 128MB,
You can set it in Spark as below:sc.hadoopConfiguration.setInt("parquet.block.size",256*1024*1024)
ordf.write.option("parquet.block.size",256*1024*1024)