PixieDust支持流数据
随着物联网设备(物联网)的兴起,能够分析和可视化实时数据流变得越来越重要。例如,您可以让机器中的温度计之类的传感器或诸如起搏器之类的便携式医疗设备不断将数据流传输到Kafka之类的流服务中。通过为PixieApp和display()框架提供简单的集成API,PixieDust使得在Jupyter Notebook内部处理实时数据更加容易。
在可视化级别,PixieDust使用Bokeh支持 来进行有效的数据源更新,以将流数据绘制到实时图表中(请注意,目前仅支持折线图和散点图,但将来会添加更多)。display()框架还支持使用Mapbox渲染引擎对流数据进行地理空间可视化。
要激活流可视化,您需要使用从StreamingDataAdapter继承的类,该类是PixieDust API的一部分,是一个抽象类。此类充当流数据源与可视化框架之间的通用桥梁。
注意:我建议您花一些时间在 此处查看StreamingDataAdapter的代码。
下图显示了StreamingDataAdapter数据结构如何适合display()框架:
StreamingDataAdapter体系结构
在实现StreamingDataAdapter的子类时,必须重写基类提供的doGetNextData()方法,该方法将被重复调用以获取新数据以更新可视化。您还可以选择重写getMetadata()方法,以将上下文传递给渲染引擎(我们稍后将使用此方法来配置Mapbox渲染)。
doGetNextData()的抽象实现
如下所示:
@abstractmethod
def doGetNextData(self):
“”“从基础流返回下一批数据。
可接受的返回值为:
[x,y):代表x和y轴的list / numpy数组的元组
熊猫数据框
y:代表y轴的list / numpy数组。在这种情况下,x轴将自动创建
熊猫系列:类似于#3
json
Geojson
具有有效负载的url(json / geojson)
“”
通过
前面的文档字符串解释了允许从doGetNextData()返回的数据的不同类型。
例如,我们要在地图上实时显示虚拟无人机在地球上徘徊的位置。它的当前位置由此处的REST服务提供。
有效负载正在使用Geojson [ii],例如:
{
“几何”:{
“ type”:“ Point”,
“坐标”:[
-93.824908715741202,10.875051131034805
]
},
“ type”:“功能”,
“属性”:{}
}
为了实时呈现我们的无人机位置,我们创建了一个从StreamingDataAdapter继承的DroneStreamingAdapter类,并简单地在doGetNextData()方法[iii]中返回了无人机位置服务URL,如以下代码所示:
来自pixiedust.display.streaming导入*
类DroneStreamingAdapter(StreamingDataAdapter):
def getMetadata(self):
iconImage =“火箭15”
返回{
“ layout”:{“ icon-image”:iconImage,“ icon-size”:1.5},
“ type”:“符号”
}
def doGetNextData(self):
返回“ https://wanderdrone.appspot.com/”
适配器= DroneStreamingAdapter()
显示适配器)
在getMetadata()方法中,我们返回Mapbox特定的样式属性(如此处 所述,该文件使用火箭Maki图标 作为无人机的符号)。
只需几行代码,我们就可以创建无人机位置的实时地理空间可视化结果,结果如下:
无人机的实时地理空间映射
注意:您可以在此位置的PixieDust存储库中找到此示例的完整笔记本。
向您的PixieApp添加流功能
在下一个示例中,我们将展示如何使用PixieDust提供的MessageHubStreamingApp PixieApp可视化来自Apache Kafka数据源的流数据。
注意: MessageHubStreamingApp与名为Message Hub的IBM Cloud Kafka服务一起使用,但是可以轻松地使其适应任何其他Kafka服务。
通过此PixieApp,用户可以选择与服务实例相关联的Kafka主题,并实时显示事件。假设来自选定主题的事件有效负载使用JSON格式,则它将呈现从对事件数据进行采样中推断出的架构。然后,用户可以选择一个特定的字段(必须为数字),并显示一个实时图表,显示该字段随时间变化的平均值。
实时可视化流数据
提供流功能所需的关键PixieApp属性是pd_refresh_rate,它以指定的间隔执行特定的内核请求(拉模型)。在前面的应用程序中,我们使用它来更新实时图表,如showChart路由[iv]返回的以下HTML片段所示:
@route(topic =“ *”,streampreview =“ *”,schemaX =“ *”)
def showChart(self,schemaX):
self.schemaX = schemaX
self.avgChannelData = self.streamingData.getStreamingChannel(self.computeAverages)
返回“”
<div class =“ well” style =“ text-align:center”>
<div style =“ font-size:x-large”> {{this.schemaX}}(平均值)的实时图表。</ div>
</ div>
<div pd_refresh_rate =“ 1000” pd_entity =“ avgChannelData”> </ div>
“”
前面的div通过pd_entity属性绑定到avgChannelData实体,并负责创建每秒更新一次的实时图表(pd_refresh_rate = 1000 ms)。反过来,通过调用getStreamingChannel()创建avgChannelData实体,该调用传递给自身。computeAverage函数负责更新所有流数据的平均值。重要的是要注意avgChannelData是一个从StreamingDataAdapter继承的类,因此可以传递给display()框架以构建实时图表。
难题的最后一部分是让PixieApp返回display()框架所需的displayHandler。这是通过如下重写newDisplayHandler()方法[v]来完成的:
def newDisplayHandler(自我,选项,实体):
如果self.streamingDisplay为None:
self.streamingDisplay = LineChartStreamingDisplay(选项,实体)
其他:
self.streamingDisplay.options =选项
返回self.streamingDisplay
在前面的代码中,我们使用它来创建pixiedust.display.streaming.bokeh包(https://github.com/ibm-watson-data-lab/pixiedust/blob/master/ pixied ...),传递avgChannelData实体。
如果要查看该应用程序的运行情况,则需要在IBM Cloud上创建Message Hub服务实例,并使用其凭证在笔记本中使用以下代码调用此PixieApp:
从pixiedust.apps.messageHub导入*
MessageHubStreamingApp()。run(
凭证= {
“用户名”:“ XXXX”,
“密码”:“ XXXX”,
“ api_key”:“ XXXX”,
“ prod”:正确
}
)
概要
我们已经介绍了有关如何使用PixieDust display()API创建自己的自定义可视化效果的详细信息。
题库