#
# Spark Streaming Wordcount - netcat client
#
# Spark example code from:
# https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py
#
"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
#
#
# Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live
#
#
"""
Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live
Create a queue of RDDs that will be mapped/reduced one at a time in
1 second intervals.
To run this example use
'$ bin/spark-submit examples/AN_Spark/AN_Spark_Code/twitterstreaming.py'
"""
#
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import twitter
import dateutil.parser
import json
#
#
# Connecting Streaming Twitter with Streaming Spark via Queue
#
class Tweet(dict):
def __init__(self, tweet_in):
super(Tweet, self).__init__(self)
if tweet_in and 'delete' not in tweet_in:
self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at']
).replace(tzinfo=None).isoformat()
self['text'] = tweet_in['text'].encode('utf-8')
#self['text'] = tweet_in['text']
self['hashtags'] = [x['text'].encode('utf-8') for x in tweet_in['entities']['hashtags']]
#self['hashtags'] = [x['text'] for x in tweet_in['entities']['hashtags']]
self['geo'] = tweet_in['geo']['coordinates'] if tweet_in['geo'] else None
self['id'] = tweet_in['id']
self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8')
#self['screen_name'] = tweet_in['user']['screen_name']
self['user_id'] = tweet_in['user']['id']
def get_next_tweet(twitter_stream):
stream = twitter_stream.statuses.sample(block=True)
# testing = stream.next() # This is just to make sure the stream is emitting data.
tweet_in = None
while not tweet_in or 'delete' in tweet_in:
tweet_in = stream.next()
tweet_parsed = Tweet(tweet_in)
# print(json.dumps(tweet_in, indent=2, sort_keys=True))
# return json.dumps(tweet_in, indent=2, sort_keys=True)
return json.dumps(tweet_parsed)
def process_rdd_queue(twitter_stream):
# Create the queue through which RDDs can be pushed to
# a QueueInputDStream
rddQueue = []
for i in range(3):
rddQueue += [ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)]
#
# kafka producer
#
#
import time
from kafka.common import LeaderNotAvailableError
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from datetime import datetime
def print_response(response=None):
if response:
print('Error: {0}'.format(response[0].error))
print('Offset: {0}'.format(response[0].offset))
def main():
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
try:
time.sleep(5)
topic = 'test'
for i in range(5):
time.sleep(1)
msg = 'This is a message sent from the kafka producer: ' \
+ str(datetime.now().time()) + ' -- '\
+ str(datetime.now().strftime("%A, %d %B %Y %I:%M%p"))
print_response(producer.send_messages(topic, msg))
except LeaderNotAvailableError:
# https://github.com/mumrah/kafka-python/issues/249
time.sleep(1)
print_response(producer.send_messages(topic, msg))