What if I wanted to submit remote PySpark jobs to AWS EMR without worrying about library dependency versions ?

Mageswaran D
5 min readDec 8, 2021

Typically Spark cluster is used to run ETL jobs or some streaming jobs, where everything is managed by developers for developers, where end user only access the processed data.

I worked for nearly two years in such setup, and I can replicate the same with ease.

How many times we happen to get what we are experienced? I am not exception here!

My new task is that I wanted to prepare the Spark cluster to run end user jobs thats gets triggered from Web UI. Yes you read it correct, not the typical spark-submit or Airflow operators. I have to come up with a plan to run Spark job from Web UI and read the results when it is ready and show them in UI.

Basically opening up the entire cluster to end users to run the jobs against available dataset with some custom configurations from a click of a button, fancy way to start Spark jobs isn’t ? :)

Two things to take care:

  1. Remote job submission
  2. Manage Python library dependencies

Obviously the possible solution is to submit Spark jobs over REST API endpoints. It can be YARN REST end point or the famous Apache Livy

So the next thing to handle is application’s 3rd party library dependencies. In a typical ETL pipeline all the requirements are captured in requirements file and it gets installed during bootstrap action on EMR nodes.

Usually for each job category I end up in creating a cluster or re-create a new cluster for every other updates that the code under goes and it’s not a big deal as I can spin up new cluster for each ETL batch processing.

Now it becomes problematic when I wanted to use the same cluster to run different jobs which are having different dependencies and versions. It’s a night mare to manage different clusters for different projects.

So whats the alternative? Docker containers. Pack what I need in an image and through it to runtime.

Ok, how to run Spark on Docker containers? Do I need to adopt Kubernetes instead of YARN? Luckily with EMR I can live with YARN Docker containers, before the real need comes for Kubernetes.

YARN Docker Container

Welcome to YARN Docker support.

In one sentence: Instead of JVM application containers, now use Docker containers.

Must read blog from AWS on setting up YARN Docker Containers:

I would say adding Docker support to YARN is a blessing for this kind of scenario as it made my life easy to architect whole pipeline.

AWS has a base image called amazoncorretto:8 which comes with preloaded Java which can be used to run Spark jobs.

Example Docker File:

FROM amazoncorretto:8RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development
RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
RUN python -V
RUN python3 -V
ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3
RUN pip3 install --upgrade pip
RUN pip3 install numpy pandas
RUN python3 -c "import numpy as np"

Ok now I can use Dockerfile to package what I need into the image. Next, where to push/version the image ? Dockerhub!

Yes Dockerhub is good if I use open source tools, but what if I have to use proprietary code? Use AWS EKS?

Nah…my client uses Dockerhub with private repos. Okay… next hurdle how to pass the credentials to YARN ? without which Docker image pull fails.

Create a config.json, something like this, better to involve DevOps on this to get auth:

cat ~/.docker/config.json 
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "123456789abcdefghijklm=="
}
}
}

Now copy the config file to HDFS and point the path like below:

--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG="hdfs:///path/to/hadoop/config.json"
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG="hdfs:///path/to/hadoop/config.json"

How to make Spark to use Docker container? Use following YARN Docker Spark configuration:

--conf spark.executorEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=python3
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=python3
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3

Livy

Livy enables programmatic, fault-tolerant, multi-tenant submission of Spark jobs from web/mobile apps (no Spark client needed). So, multiple users can interact with your Spark cluster concurrently and reliably.

Submitting to Livy URL hosted in EMR needs the URL first to be exposed to outside world or VPN. Let’s leave that to DevOps team to take care for us!

Check here for Livy port details on EMR:

Tunnel the Livy port to local machine as follows, for REST endpoint. In production the URL and port needs to be mapped to a VPN.

export USER=hadoop
export EMR_SPARK_MASTER=ip-99-33-222-111.us-west-2.compute.internal
sudo ssh -i ~/emrkey.pem -N -L 8998:$EMR_SPARK_MASTER:8998 hadoop@$EMR_SPARK_MASTER

Check Livy Docs @

Test the connection with curl :

export LIVY_URL=localhost:8998
curl $LIVY_URL/sessions/ | python -m json.tool

Move pi.py example to S3 to test run it with Livy:

curl \
-X POST \
--data '{
"name": "UDCTestLivy1",
"file": "s3://users-dev-bucket/mageswaran/livy/pi.py",
"driverMemory": "4G",
"driverCores": 4,
"executorMemory": "32G",
"executorCores": 5,
"numExecutors": 2,
"conf": {"spark.scheduler.mode": "FAIR"}
}'\
-H "Content-Type: application/json" \
$LIVY_URL/batches

Check YARN UI for logs and if everything seems working we are set to use Python!

python3 livy_Spark.py \
--url http://$LIVY_URL \
--name SparkDockerLivyTest \
--file s3://bucket/mageswaran/livy/main.py \
--pyFiles s3://bucket/mageswaran/livy/project.zip \
--args '--config_file s3://bucket/mageswarand/livy/job1_config.json'

Note: project.zip contains the zipped version of your source packages thats needs to be distributed to the cluster.

Next step is to integrate this into backend services and re-route some of the jobs to Spark cluster!

References:

--

--