A dive into Apache Spark Parquet Reader for small size files

Python -> C API -> Unix File descriptor in user Space -> Kernel Space -> Hard disk Driver -> Physical location

Parquet Format

Source: https://databricks.com/glossary/what-is-parquet
Take few minutes to read…

Parquet file in Spark

- spark.sql.files.maxPartitionBytes: 128MB (The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.)- spark.sql.parquet.enableVectorizedReader: Enables vectorized parquet decoding.- spark.sql.parquet.filterPushdown : Enables Parquet filter push-down optimization when set to true.- spark.sql.parquet.mergeSchema : When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.- spark.sql.parquet.compression.codec: ets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.- spark.sql.parquet.cacheMetadata: Turns on caching of Parquet schema metadata. Can speed up querying of static data.- spark.sql.files.openCostInBytes: The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
spark.default.parallelism (default: Total No. of CPU cores)
spark.sql.files.maxPartitionBytes (default: 128 MB)
spark.sql.files.openCostInBytes (default: 4 MB)
maxSplitBytes = Minimum(maxPartitionBytes,  Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes or size of a given file) / default.parallelism

Demo Time

maxSplitBytes = Minimum(maxPartitionBytes,  Maximum(openCostInBytes, bytesPerCore))bytesPerCore = (Sum of all data file sizes  or size of a given file ) / default.parallelism# say one of the file size is 4282731 bytesbytesPerCore = 4282731 / 30 ~= 0.13MBTherefore, maxSplitBytes = min(128MB, max(4MB, 0.13MB))
# Code snippet to get number of records in each partitionfrom pyspark.sql.functions import spark_partition_id, asc, descdf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show(False, 350)# ------------------------------------------------------------------# boto3 code to get S3 file size:import boto3
from botocore.errorfactory import ClientError
s3 = boto3.client('s3')s3_resource = boto3.resource('s3')def s3_file_size(s3_url):
s3_bucket = s3_url.split("/")[2]
key = "/".join(s3_url.split("/")[3:])
file_size = 0
try:
object = s3_resource.ObjectSummary(s3_bucket, key)
file_size = object.size
if int(file_size)/1024/1024 > 4:
print(f"\ns3://{s3_bucket}/{key}", int(file_size))
except ClientError:
# Not found
file_size = 0
return file_size

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store