全部版块 我的主页
论坛 提问 悬赏 求职 新闻 读书 功能一区 学道会
1038 0
2019-11-25
1、Spark Structured Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。

2、打开一个终端窗口,启动带有端口号9999的套接字服务器:
    $ nc -lk 9999
   (注:如果没有nc的话,先安装它: $ sudo yum install nc)

3、启动PyCharm,新建一个Python文件,编辑内容如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder.appName(\"ssc_socket_wc\").getOrCreate()

# 设置日志级别
spark.sparkContext.setLogLevel(\'WARN\')

# 创建DataFrame,代表来自localhost:9999的输入流(从Socket数据源读取流数据)
lines = spark.readStream \\
            .format(\"socket\") \\
            .option(\"host\", \"192.168.190.145\") \\
            .option(\"port\", 9999) \\
            .load()

# 将单词分割为行(DataFrame具有默认的value列)
words = lines.select(explode(split(lines.value, \" \")).alias(\"word\"))

# 生成运行单词计数
wordCounts = words.groupBy(\"word\").count()

# 开始运行查询,并将运行时计数输出到控制台
# 可以指定三种输出模式:append, complete, update
query = wordCounts \\
    .writeStream \\
    .format(\"console\") \\
    .outputMode(\"complete\") \\   
    .start()

query.awaitTermination()

4、运行该python流程序

5、再回到nc窗口,在nc窗口中随意输入一些语句,语句中的单词以空格分隔
   监听窗口就会自动获得单词数据流信息,并打印出词频统计信息


---------------------------------------------------------------------------
在Spark集群上执行过程(以下均在Linux下操作)
1)先启动nc服务器
      $ nc -lk 9999

2)另打开一个终端,执行以下命令提交流作业程序:
      $ spark-submit --master local
  • ssc_socket_wc.py

    3)在nc服务器一方,随意输入一些语句,单词之间以空格分割
          good good study
          day day up

    4)回到流程序运行窗口,查看输出

  • 二维码

    扫码加我 拉你入群

    请注明:姓名-公司-职位

    以便审核进群资格,未注明则拒绝

    相关推荐
    栏目导航
    热门文章
    推荐文章

    说点什么

    分享

    扫码加好友,拉您进群
    各岗位、行业、专业交流群