A dive into Apache Spark Parquet Reader for small size files

Python -> C API -> Unix File descriptor in user Space -> Kernel Space -> Hard disk Driver -> Physical location

Parquet Format

Source: https://databricks.com/glossary/what-is-parquet
Take few minutes to read…
  • Data is stored in columns instead of rows, helping aggregation queries to run faster, by selecting only the columns needs for the query
  • Parquet is binary format, thus enabling storing encoded data types for effective serialising and deserialising of the data on the disk (compression)
  • Support fo nested schema
  • Metadata is stored after writing the data at the footer capturing the summary statistics of the column chunks.

Parquet file in Spark

- spark.sql.files.maxPartitionBytes: 128MB (The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.)- spark.sql.parquet.enableVectorizedReader: Enables vectorized parquet decoding.- spark.sql.parquet.filterPushdown : Enables Parquet filter push-down optimization when set to true.- spark.sql.parquet.mergeSchema : When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.- spark.sql.parquet.compression.codec: ets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.- spark.sql.parquet.cacheMetadata: Turns on caching of Parquet schema metadata. Can speed up querying of static data.- spark.sql.files.openCostInBytes: The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
  • Partitions in Spark are determined by type of file system and then by file type
  • In HDFS files are written in blocks so when reading each block will be read as a partition, neat and simple!
  • In S3, as it is object store where there is no such information available to partition the data, so Spark depends on some user configs
spark.default.parallelism (default: Total No. of CPU cores)
spark.sql.files.maxPartitionBytes (default: 128 MB)
spark.sql.files.openCostInBytes (default: 4 MB)
maxSplitBytes = Minimum(maxPartitionBytes,  Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes or size of a given file) / default.parallelism

Demo Time

  • Loading 128 files all in one shot as list of files for the read API: Where number of partitions is controlled by number of cores, file sizes and maxPartitionBytes
  • Load 288 files one by one : Based on individual file size, `maxPartitionBytes` and `openCostInBytes` partitions are created

Case 1:

Case 2:

maxSplitBytes = Minimum(maxPartitionBytes,  Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes  or size of a given file ) / default.parallelism# say one of the file size is 4282731 bytesbytesPerCore = 4282731 / 30 ~= 0.13MBTherefore, maxSplitBytes = min(128MB, max(4MB, 0.13MB))
# Code snippet to get number of records in each partitionfrom pyspark.sql.functions import spark_partition_id, asc, descdf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show(False, 350)# ------------------------------------------------------------------# boto3 code to get S3 file size:import boto3
from botocore.errorfactory import ClientError
s3 = boto3.client('s3')s3_resource = boto3.resource('s3')def s3_file_size(s3_url):
s3_bucket = s3_url.split("/")[2]
key = "/".join(s3_url.split("/")[3:])
file_size = 0
try:
object = s3_resource.ObjectSummary(s3_bucket, key)
file_size = object.size
if int(file_size)/1024/1024 > 4:
print(f"\ns3://{s3_bucket}/{key}", int(file_size))
except ClientError:
# Not found
file_size = 0
return file_size

--

--

--

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

What is Microservice & Monolithic Architectures?

The Evolution of Cloud-Native: Success Stories from Our Customers

Kubernetes Tutorial — A Comprehensive Guide The Orchestration Giant

Building a Serverless Application on Alibaba Cloud (Part 2)

Why testing is important?

Hangouts App Crashes On Launch

Log Viewer and SHELL Command

ELF MATRIX IDO ON DIVINER PROTOCOL

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mageswaran D

Mageswaran D

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

More from Medium

Expedite Spark Processing using Parquet Bloom Filter

How to Flatten Json Files Dynamically Using Apache Spark(Scala Version)

Faster Java UDF in Pyspark

Spark Performance Tuning: Skewness Part 2