1、Spark Structured Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
2、打开一个终端窗口,启动带有端口号9999的套接字服务器:
$ nc -lk 9999
(注:如果没有nc的话,先安装它: $ sudo yum install nc)
3、启动PyCharm,新建一个Python文件,编辑内容如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession.builder.appName(\"ssc_socket_wc\").getOrCreate()
# 设置日志级别
spark.sparkContext.setLogLevel(\'WARN\')
# 创建DataFrame,代表来自localhost:9999的输入流(从Socket数据源读取流数据)
lines = spark.readStream \\
.format(\"socket\") \\
.option(\"host\", \"192.168.190.145\") \\
.option(\"port\", 9999) \\
.load()
# 将单词分割为行(DataFrame具有默认的value列)
words = lines.select(explode(split(lines.value, \" \")).alias(\"word\"))
# 生成运行单词计数
wordCounts = words.groupBy(\"word\").count()
# 开始运行查询,并将运行时计数输出到控制台
# 可以指定三种输出模式:append, complete, update
query = wordCounts \\
.writeStream \\
.format(\"console\") \\
.outputMode(\"complete\") \\
.start()
query.awaitTermination()
4、运行该python流程序
5、再回到nc窗口,在nc窗口中随意输入一些语句,语句中的单词以空格分隔
监听窗口就会自动获得单词数据流信息,并打印出词频统计信息
---------------------------------------------------------------------------
在Spark集群上执行过程(以下均在Linux下操作)
1)先启动nc服务器
$ nc -lk 9999
2)另打开一个终端,执行以下命令提交流作业程序:
$ spark-submit --master local
ssc_socket_wc.py
3)在nc服务器一方,随意输入一些语句,单词之间以空格分割
good good study
day day up
4)回到流程序运行窗口,查看输出