A dive into Apache Spark Parquet Reader for small size files
While working part of my current project, I was asked how Spark reads Parquet files and how does it achieves the parallelisation. Very simple question indeed.
Part of my current project I had to load lot of small files and do some filtering and aggregations to get counts for given user config.
With existing Pandas + Kubernetes setup, the solution is to list the files for the user config and then load them in parallel for each time range split, process them, merge them and get the counts with all nuts and bolts in place.
When Spark was brought into picture it seems to process faster than the existing solution in the range of 2X (with test config, however it is subjective to user config). The very first question that was brought up was, how Spark reads parquet data ?
I have worked with Spark for years now, tasks like reading Parquet files are natural and it doesn’t make me to question them. Did you? Come on!
How many time we questioned ourself on how file open works in python? We may dig in if someone ask it in part of an interview 😉 If my memory goes correct, it involves more or like:
Python -> C API -> Unix File descriptor in user Space -> Kernel Space -> Hard disk Driver -> Physical location
And to be frank I was blank on the spot, what “HOW” ? there is an API and bunch of configs we use them to load data and Spark loads it for us.
For the team knowing “HOW” part is important for them to make a decision on how Spark gonna handle their work load on peak hours? Moving a solution to Spark simply doesn’t guarantee its worthiness unless all edge cases are evaluated and sometimes the basic ones too!
With lots and lots of small files in the picture, it doesn’t matter how big the cluster gonna be, the read operation is going to be a big bottle neck or a show stopper, indeed. Imagine what happens if the user config leads to reading 1000 files, 10,000 files or even 100,000 files? How Spark reads the file and creates partitions when multiple files are loaded ?
Join with me to investigate them!
If you have patience to read thesis paper, you may hit this link to read “Predicate Pushdown in Parquet and Apache Spark”
I have been working with Parquet for say 5 years now and all I know is it is columnar format good for Big Data! No idea on how or why part of it? even though I wanted to do this exploration for long time now, there was no bottle neck I faced till this point as I was dealing large files most of the time.
Googling for Parquet format lead me to official Git repo @ https://github.com/apache/parquet-format and I spend few mins to read and understand the high level and few follow ups form Google search.
From official Git…
In Parquet, a data set comprising of rows and columns is partition into one or multiple files. Each individual file contains one or multiple horizontal partitions of rows called row groups (by default 128MB in size). Each row group subsequently contains a column chunk (i.e. vertical partition) for each of the columns of the horizontal partition. Within a column chunk, the actual values are stored in data pages (by default storing 1MB of values). Each data page stores either a fixed or variable amount of (compressed) values, depending on the data type of the column values and the encoding scheme used.
- Data is stored in columns instead of rows, helping aggregation queries to run faster, by selecting only the columns needs for the query
- Parquet is binary format, thus enabling storing encoded data types for effective serialising and deserialising of the data on the disk (compression)
- Support fo nested schema
- Metadata is stored after writing the data at the footer capturing the summary statistics of the column chunks.
Parquet is binary format, so to do any experiment on raw files needs considerable amount of code to read without any library support or we have to use some community projects like Apache Arrow for quick experiments.
Since our focus is with Spark, I am gonna skip this and move on, with one task for you…
Can we read Parquet files in chunks if so how to read them (any library is fine) ? Please do post in the comments when you come up with one
For example metadata can be read with arrow like :
Some good reads on similar topic…
Spark: Understand the Basic of Pushed Filter and Partition Filter Using Parquet File
Pushed Filter and Partition Filter are techniques that are used by spark to reduce the amount of data that are loaded…
Parquet file in Spark
Reading file in Spark is straight forward, knowing all the configurations is pain, and even more pain is when new configuration gets added and we don’t know its existence.
- spark.sql.files.maxPartitionBytes: 128MB (The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.)- spark.sql.parquet.enableVectorizedReader: Enables vectorized parquet decoding.- spark.sql.parquet.filterPushdown : Enables Parquet filter push-down optimization when set to true.- spark.sql.parquet.mergeSchema : When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.- spark.sql.parquet.compression.codec: ets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.- spark.sql.parquet.cacheMetadata: Turns on caching of Parquet schema metadata. Can speed up querying of static data.- spark.sql.files.openCostInBytes: The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
- Partitions in Spark are determined by type of file system and then by file type
- In HDFS files are written in blocks so when reading each block will be read as a partition, neat and simple!
- In S3, as it is object store where there is no such information available to partition the data, so Spark depends on some user configs
Following two links gives good information on how the number of partitions are determined while reading data files
Building Partitions For Processing Data Files in Apache Spark
Continuing an earlier story on determining the number of partitions at critical transformations, this story would…
FileSourceScanExec supports bucket pruning so it only scans the bucket files required for a query. Pushed Filters…
The main configs that determines the number of partitions while reading Parquet files are:
spark.default.parallelism (default: Total No. of CPU cores)
spark.sql.files.maxPartitionBytes (default: 128 MB)
spark.sql.files.openCostInBytes (default: 4 MB)
And the following calculation:
maxSplitBytes = Minimum(maxPartitionBytes, Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes or size of a given file) / default.parallelism
Spark Scala Source Code Reference @ https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
Lets do two experiments
- Loading 128 files all in one shot as list of files for the read API: Where number of partitions is controlled by number of cores, file sizes and maxPartitionBytes
- Load 288 files one by one : Based on individual file size, `maxPartitionBytes` and `openCostInBytes` partitions are created
Cluster Info: 6 executors with 5 cores each
If we load 128 S3 files which are less than 10MB each and a total of 392MB, with
spark.read.parquet(paths), the 128 files will be distributed across the nodes, with each core creating one partition, compressing 4 or so files into one partition.
You could see a count operation @ row number 0: reads 128 files into 31 partitions @ row number 1(need to investigate why one extra partition, may be sometime later!). We do two such count operation so you can see second read and partitioning @ row number 7
If we load 288 S3 files with file size ranging form 0.5MB to 4.3MB one after another, with 12 file sizes are greater than 4 MB marginally, the total number of partitions will be 300 @ row number 13.
maxSplitBytes = Minimum(maxPartitionBytes, Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes or size of a given file ) / default.parallelism# say one of the file size is 4282731 bytesbytesPerCore = 4282731 / 30 ~= 0.13MBTherefore, maxSplitBytes = min(128MB, max(4MB, 0.13MB))
So for those 12 files whose file sizes are greater than 4MB ends up creating two partitions, totalling 288 + 12 = 300 partitions
When I changed `spark.sql.files.openCostInBytes` to 5MB, the number of partitions dropped to 288 with one partition for each file.
So next time when you see more number of partitions for small files, you know what to tweak around in Spark.
With big files, maxPartitionBytes would be used as the value for partitioning Parquet files.
Once the Parquet file small or big read into Spark Dataframe with certain partitions, rest of the processing is assured by Spark engine.
Code snippets that were used while playing around loading small files in PySpark.
# Code snippet to get number of records in each partitionfrom pyspark.sql.functions import spark_partition_id, asc, descdf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show(False, 350)# ------------------------------------------------------------------# boto3 code to get S3 file size:import boto3
from botocore.errorfactory import ClientErrors3 = boto3.client('s3')s3_resource = boto3.resource('s3')def s3_file_size(s3_url):
s3_bucket = s3_url.split("/")
key = "/".join(s3_url.split("/")[3:])
file_size = 0
object = s3_resource.ObjectSummary(s3_bucket, key)
file_size = object.size
if int(file_size)/1024/1024 > 4:
# Not found
file_size = 0
Thanks, for reading!