Big Data Play Ground for Engineers: Dump Twitter Stream into Kafka topic

Mageswaran D
4 min readApr 17, 2020

This is part of the series called Big Data Playground for Engineers and the content page is here!

Git:

A fully functional code base and use case examples are up and running.

Repo: https://github.com/gyan42/spark-streaming-playground

Website: https://gyan42.github.io/spark-streaming-playground/build/html/index.html

How can any one can ignore Twitter tweet stream? when they start learning Streaming.

Example Illustration

Why we need Kafka? In brief its a distributed streaming platform. Imagine we wanted to do some sort of NLP task on the tweet text, with in matter of time the single machine will be overwhelmed with the text data, hitting its computing limits. Here Kafka helps mostly in handling the stream data before it gets processed by itself or by external computing platform like Apache Streaming. From there we can do all crazy stuff classification, text analysis, sentiment analysis etc without worrying of loosing data!

Why it is Tweet stream is interesting:

  • Live stream of happenings around the world
  • Data is of text nature, giving us opportunity to come up quite a few use cases
  • Geo locations are available
  • User information (masked to some level!) available which can be used for building social graph
  • Last but the least the infamous #hashtags

Like any other web application , Twitter also needs API keys to access theie tweet stream.

Lets get our keys for our playground use cases…

Creating Your Own Credentials for Twitter APIs

Twitter exposes some REST endpoints which can be accessed with the approved credentials through some programming language packages.

In order to get tweets from Twitter, you need to register on TwitterApps by clicking on “Create new app” and then fill the below form click on “Create your Twitter app.”, select the purpose and opt for individual developer account.

Then fill out the form and wait for the team to approve your app.

After you get it approved, go to Apps -> Your App

Make a note of your consumer key & secret and access token & secret.

Twitter Stream with Python

There is no better alternative than Tweepy for Python.

To begin, have a good understanding on the Tweet Json schema…

Before continuing, there is a catch with Twitter API, we need to specify the filter words we are interested.

twitter_stream = Stream(auth, TweetsListener(kafka_addr=self._kafka_addr, topic=kafka_topic, is_ai=is_ai))
# https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
# https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters
twitter_stream.filter(track=keywords, languages=["en"])

So, the stream is not actually full data that the world talks about. Its a subset of the actual data or at least in the free version.

This forces us to give some sort of key words as part of the Tweet API endpoints that we are interested, to listen the stream. One way to make the stream more generic is to filter for stop words.

Lets build a python app to listen two streams and display the text from two streams in different color.

One more specific tweets of certain interest and other stream is generic that listens wider tweets. For example come up with two sets of key words:

  1. Tweets that talks about AI/ML/DL
  2. Tweets that are generic or false positive that resembles AI/ML/DL but not i.e false positive

Then create Tweepy Stream listener class, that can listen on the stream and print the tweets.

Since we wanted to listen to two different streams, lets make use of python threading, thus enabling us:

  • To start two streams
  • Continuously listen to tweets even if there is network issue by reconnecting

Code :

With this in place next step would dumping the data into Kafka topic for a distributed handling of the data!

Now that accessing twitter stream has matered, lets add some spice to it by dumping the data into Kafka topic.

All we have to do is update the Tweepy listener, with Kafka producer details:

self._kafka_producer = KafkaProducer(bootstrap_servers='localhost:9092')

and in the data call back, add:

self._kafka_producer.send("kafka_topic", data.encode('utf-8')).get(timeout=10)

Check following code link for actual implementation:

--

--