全部版块 我的主页
论坛 提问 悬赏 求职 新闻 读书 功能一区 学道会
282 0
2019-11-26
# 以下为数据中心的两个数据传感器检测到的两个机架的温度数据
/*
file1.json:
{\"rack\":\"rack1\",\"temperature\":99.5,\"ts\":\"2017-06-02T08:01:01\"}
{\"rack\":\"rack1\",\"temperature\":100.5,\"ts\":\"2017-06-02T08:06:02\"}
{\"rack\":\"rack1\",\"temperature\":101.0,\"ts\":\"2017-06-02T08:11:03\"}
{\"rack\":\"rack1\",\"temperature\":102.0,\"ts\":\"2017-06-02T08:16:04\"}

file2.json:
{\"rack\":\"rack2\",\"temperature\":99.5,\"ts\":\"2017-06-02T08:01:02\"}
{\"rack\":\"rack2\",\"temperature\":105.5,\"ts\":\"2017-06-02T08:06:04\"}
{\"rack\":\"rack2\",\"temperature\":104.0,\"ts\":\"2017-06-02T08:11:06\"}
{\"rack\":\"rack2\",\"temperature\":108.0,\"ts\":\"2017-06-02T08:16:08\"}
*/

# 所有计算机机架在一个滑动窗口上的平均温度
# 每5分钟,统计一下最近十分钟的平均温度
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 指定一个Schema(模式)
fields = [StructField(\"rack\", StringType(), False),
          StructField(\"temperature\", DoubleType(), False),
          StructField(\"ts\", TimestampType(), False)]
iotDataSchema = StructType(fields)

# 读取温度数据
dataPath = \"/data/spark_demo/streaming/iot-input\"
iotSSDF = spark.readStream.schema(iotDataSchema).json(dataPath)

# group by一个滑动窗口,并在temperature列上求平均值
iotAvgDF = iotSSDF.groupBy(window(col(\"ts\"), \"10 minutes\", \"5 minutes\")).agg(avg(\"temperature\").alias(\"avg_temp\"))

# 将数据写出到memory data sink,使用查询名称iot
iotMemorySQ = iotAvgDF.writeStream.format(\"memory\").queryName(\"iot\").outputMode(\"complete\").start()

# 显示数据,以start时间排序
spark.sql(\"select * from iot\").orderBy(col(\"window.start\")).show(truncate=False)

# 停止该流查询
iotMemorySQ.stop()

----------------------------------------------------------------------------------------------
重构:找出是哪些机架的温度在上升
# 每个机架在一个滑动窗口上的平均温度
# group by一个滑动窗口和rack列
iotAvgByRackDF = iotSSDF.groupBy(window(col(\"ts\"), \"10 minutes\", \"5 minutes\"), col(\"rack\"))
.agg(avg(\"temperature\").alias(\"avg_temp\"))

# 写出到memory data sink,使用iot_rack查询名称
iotByRackConsoleSQ = iotAvgByRackDF.writeStream.format(\"memory\").queryName(\"iot_rack\").outputMode(\"complete\").start()

spark.sql(\"select * from iot_rack\").orderBy(col(\"rack\"), col(\"window.start\")).show(truncate=False)
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群