Big Data Play Ground For Engineers: Full Stack Data Science Engineering

Mageswaran D
12 min readApr 19, 2020

This is part of the series called Big Data Playground for Engineers and the content page is here!

Git:

A fully functional code base and use case examples are up and running.

Repo: https://github.com/gyan42/spark-streaming-playground

Website: https://gyan42.github.io/spark-streaming-playground/build/html/index.html

It would be good to read the theory on the subject of building production ready Machine Learning System through followings links, since we are going to see how materialize the theory. As part of this series I shown how to co-relate the common tutorials that we come across in web to make a end end real time pipeline.

The example use case and the code used to materialize the theory is just an attempt for educational purpose on a single local machine, sure there are lot of improvements that has to be considered for production scenario. Nevertheless, I think this would be good eye opener for many, exposing you to end to end data science pipeline, as it did to me.

Say a client startup company wants to retweet all AI tweets which has links, since the links may be a good source of information on the topic covering learning materials to current happenings, also they think doing so will attract attention to company their handle.

So they give the following requirements…

Requirements

  • Create a pipeline similar to following illustration
  • Build ground up data set for tweet classification (AI tweet or not) using the live streaming data. We know that the filter words in the Twitter stream does this, but we wanted out system to be generalized beyond the simple keywords and understand the tweet at language level
  • Dump the raw tweet data into a table in Postgresql DB (called streamingdb)
  • Have configuration parameters to prefix the table name forthe raw tweet data and version number as table name suffix to dump the tables into DB, as this helps to run mutliple experimentation on different set of data
  • Use semi supervised labelling methods to tag data set, for example frameworks like https://www.snorkel.org/
  • Build a UI tool to annotate the semi supervised tagged data to create golden data set for ML training
  • Design a framework to build Deep Learning models using Tensorflow and put a Naive Deep Learning/ Neural network model as an example, to train and evaluate it on golden/semi supervised data set
  • Provide scalable deployment for the DL model for text classification in real time
  • Build a simple Web UI, to highlight the web links in the tweets and store the urls

What is Streaming Pipeline

A data pipeline is software that enables the smooth, automated flow of information from one point to another. This software prevents many of the common problems that the enterprise experienced: information corruption, bottlenecks, conflict between data sources, and the generation of duplicate entries.

Streaming data pipelines, by extension, are data pipelines that handle millions of events at scale, in real time. As a result, you can collect, analyze, and store large amounts of information. That capability allows for applications, analytics, and reporting in real time.

It consists of…

  • Data Source like Twitter, Website Clicks, ATM card transaction etc.,
  • Distributed Streaming platform like Apache Kafka, Apache Flume, AWS Kinesis etc.,
  • Distributed Processing/Computing Platforms like Apache Spark
  • Databases both Relational and Non Relational like Postgresql, MySQL, HBase, Cassandra etc.,
  • Storage System like HDFS, AWS S3, GFS, Alluxio etc.,
  • Machine Learning frameworks like Scikit Learn, PyTorch Tensorflow, Spark ML, Managed ML services from AWS, Google, Microsoft etc.,
  • Analytics Dashboards with BI tools like Tableau, Qlicksense etc., or in-house build tools
Simple Streaming Pipeline

1. Problem Definition

A Streaming pipeline to collect data, prepare the data, annotate the data, build classification model on the data, deploy the model and classify the stream data in real time.

2. Data Ingestion

Data collection.

Managing data source is the first and foremost concern of the Streaming pipeline.

  • How to keep the connection alive with the data source all the time?
  • In the even of network failure, what kind of actions should be performed to re-establish the connection?
  • Can an single machine handle all the data from the data source? If not what is the mechanism to listen to the parts of the streaming data? thus enabling distributed listening of the data source

For example if the connection with the Twitter stream API is disconnected, we re-establish the connection and start listening for the data

The free Twitter streaming API is sampling 1% to 40% of tweets for given filter words (https://brightplanet.com/2013/06/25/twitter-firehose-vs-twitter-api-whats-the-difference-and-why-should-you-care/).
So how to handle to full scale real time tweets with services like Gnip Firehose(https://support.gnip.com/apis/firehose/overview.html)?

For free streaming data check this list @

Apache Kafka is used as pub/sub framework here, where the Kafka topics are created with:

/opt/binaries/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic ai_tweets_topic
/opt/binaries/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic mix_tweets_topic

These topics are used as a medium to post and retrieve the data by different pipeline components.

Example Scenario

Lets go with the free online streaming API provided by Twitter. As our client is a startup and they don’t want to pay for Twitter Firehose, but insisting to have provision to switch to when they become super hit!

Check my post on how to start with Twitter Stream with Python @

Hope you made your hands dirty with Tweepy APIs!

Like the client said the filter key words are mandatory with Twitter streaming APIs, form more details check the above link.

For our use case of AI tweet classification, we have setup our Tweepy stream API with two set of filter keywords.

  • One with keywords that talks about AI/ML/DL/Big Data etc.,
  • Other one with more generics and that are false positive key words like machine and learning, artificial and intelligence etc.,
  • We have setup two Python threads that runs, and listens for these key words separately, nevertheless to say, when these two stream tweets are collected as one, duplicates are expected, since both are from same source
  • Apache Kafka was selected for its capability in handling distributed pub/sub feature

As you can see in above code, on data receive we are dumping the data into Kafka topic using the Kafka Producer.

ai_tweets_topic is the first Kafka topic in our example, which contains more AI related tweets filtered at Twitter stream API level, based on the provided keywords. This is used only to collect more relevant AI tweets for our classification model data preparation.

mix_tweets_topic is the second Kafka topic in our example, which contains more general tweets along with AI tweets and is the one which is considered as the source, down the streaming.

With that in place we have build data source APIs and it is ready to be consumed by the down stream pipeline components.

Check out the “Data Collection” section in the how to run page of example.

3 & 4. Data Preparation / Data Segregation

Data exploration, data transformation and feature engineering.

Split subsets of data to train the model and further validate how it performs against new data.

Now that the Data Source is ready, we need to build consumer component to read the data and create data set for the classification task.

Lets configure the Spark Structured Streaming, to read the data from Kafka topic. Refer following code on how it is done.

The data received will be in general schema. Therefore the data has to be validated to a common schema, for our use. Which is done by defining PySpark DataFrame schema as follows:

The JSON data received from the Twitter stream may be a direct tweet by the user or retweeted one. Extracting the full text has to be taken care, to avoid truncated text in the tweet data.

A UDF was put in place to do the job.

The received Json data is first converted to DataFrame and aliased as temp. Then required fields then selected, cleaned for down stream modules.

This data is then dumped into Postgresql DB table with a prefix “raw_tweet_dataset” and a suffix with the version id/run id.

The table name is versioned by adding a suffix integer string to the table name like

raw_tweet_dataset_0

And we wanted to check with our client why this prefix id needed, why can’t we hard code the name? makes developer life easy right that way.

Client response…

Often in Machine Learning environment, we come up with name for our experiments which mostly tied to the data set used. Consider the prefix to be some kind of data set /experiment name that we wanted to use, when we wanted to try different data set from live stream. In our observation we found ML/DL experimentation is directly tied to data set used.

Also for a data set version, there can be different data preprocessing routines and model trained on them, however once the data set changes everything down the line has to start use the new data set reusing the their respective components as part of the pipeline.

In our case, month and day of the week will be a significant impact on collecting the AI tweets. World level AI events, meetups, discussion, new invention etc., will trigger lot of tweets on the topic or when students are busy with their academic work, or when a new disruptive tech in the market etc., So wanted to have provision for dataset name prefix and a version, in case we happened to collect data on the same week or so.

Data thus prepared is versioned as per the requirement, so that the down stream modules like ML models can keep the track of the dataset used.

The data is not preprocessed or feature engineered, as we are going to use Deep Learning models down the line.

For the purpose of text classification, we are collecting 25,000 tweets that has one of the AI keywords and another 25,000 on mixed general keywords along with AI keywords.

This version number is controlled by the trigger script, where for each run the user has to give a run id / version, which then will be used along with the table prefix to dump the data in the Postgresql DB table.

The raw_tweet_dataset_0 is then divided as follows and then annotated for classification models to get trained on.

~ Unknown counts

Check out the “Data Preparation” section in the how to run page of example.

This will create two columns : “slabel” with Snorkel labeller and “label” column created in a naive method by assigning 1 if the text contain AI keywords or 0 otherwise, which then later improved with Annotation tool.

The “label” column is prepared in a naive way, to validate the Snorkel labelling functions, in the absence of such way, we have to rely on Mannual Annotation as our starting ground truth labels.

Data Annotation

  • Snorkel is used for semi supervised label tagging. Check out my post for more info @
  • A naive Flask based annotation tools was build to assist manual labeling

Check out the “Evalaute the Snorkel” section in the how to run page of example, to evaluate the Snorkel Labelling Function performance over mannual annotated data

The data in the Postgresql DB then cab be downloaded to disk for model training, by running the data preparation script with `download` mode option.

5. & 6. Model Training and Evaluation

Luckily our client just wants the framework to tryout TensorFlow DL models, which can be scaled eventually when need arises.

So, lets consider a simple Tensorflow RNN based text classification model, which explained in detail as part of Tensorflow RNN text classification tutorial @

Its more or less same model with extra functionality of saving and loading model files in HDFS.

The configuration parameters needs to adjusted for the input data and output model directory.

A special note to client:

With respect to model output directory path, change only the version number for different run on same data set. This is done so that TensorFlow serving can pickup the latest version under the model store root directory or query the appropriate model version.

Model Description:

Model Evaluation:

7. Model Deployment

TensorFlow serving was chosen to deploy the models to serve over a REST endpoint.

Good example is @

We do the same steps, export the model path and start he tensorflow serving

# give appropriate model path that needs to be served
export MODEL_DIR=/home/mageswarand/ssp/model/raw_tweet_dataset_0/naive_text_classifier/exported/
# test the model
saved_model_cli show --dir ${MODEL_DIR}/1/ --all
# start the serving server
tensorflow_model_server \
--rest_api_port=8501 \
--model_name="naive_text_clf" \
--model_base_path="${MODEL_DIR}"

Make sure there are no errors.

Okay, good the model is deployed, but how to use, in Streaming data?

To tackle that, a Spark UDF was created which in turn talks with the TensorFlow Serving master over the endpoint.

Warning to client: (possible extension of the project)

Well this may hit the performance bottleneck sooner or later, as DL prediction will take considerable time. Model serving can be scaled using Docker and Kubernetes as mentioned @

Spark Structured Streaming with DL Model

Spark UDF function is used on the streaming text and predictions are made with TensorFlow serving over the REST endpoint.

8. Performance Monitoring

This is vast topic of itself!

As part of our bundle, we have a simple Flask application, that displays the text with prediction probability.

A suggestion to client:

One possibility here, consider this system is deployed and people are using our great app, by introducing a extra feature asking users to tag AI tweet or by considering the tweet they click as AI tweet, we can get more positive records for training. This way we can more positive AI case tweets for our model training.

Human are monitoring tool as of now! :)

Now sit back and think of the places/websites where you have given feedback :), your feedback is actually the training data for their system!

Stay tuned for more updates!

--

--