全部版块 我的主页
论坛 数据科学与人工智能 数据分析与数据科学 数据分析与数据挖掘
1193 0
2020-08-14
10分钟内完成Apache Flink
什么是Flink?
Apache Flink是一个开源流处理框架。它被 Uber,ResearchGate,Zalando等许多公司广泛使用。从根本上讲,这一切都是关于处理来自外部源的流数据的。它可以与最新的消息传递框架(例如Apache Kafka,Apache NiFi,Amazon Kinesis Streams,RabbitMQ)一起运行。
总览  
让我们探索一个使用Apache Flink进行流处理的简单Scala示例。我们将从JSON中提取来自Apache Kafka的传感器数据,对其进行解析,过滤,计算传感器在最近5秒钟内经过的距离,然后将处理后的数据发送回Kafka到另一个主题。
我们需要从Kafka获取数据-我们将创建一个简单的基于python的Kafka生产者。该代码在附录中。
版本:
Apache Kafka 1.1.0 ,Apache Flink 1.4.2 ,Python 3.6 ,Kafka-python 1.4.2 ,SBT 1.1.0
深入研究代码
现在,让我们从Flink程序的框架开始。每个Apache Flink程序都需要一个执行环境。在我们的例子中,它是一个 StreamExecutionEnvironment。
val env = StreamExecutionEnvironment.getExecutionEnvironment
然后,我们需要创建一个Kafka消费者。我们将使用 FlinkKafkaConsumer010 ,但它与我们正在使用的Kafka 1.1.0兼容。我们还将需要一个Properties 对象来设置引导服务器参数。
val kafkaProperties = new Properties()kafkaProperties.setProperty(“ bootstrap.servers”,“ localhost:9092”)val kafkaConsumer = new FlinkKafkaConsumer010(
“ sensors”,
  new SimpleStringSchema,
  kafkaProperties

现在,当我们有一个消费者时,我们将创建一个流。
val kafkaStream = env.addSource(kafkaConsumer)
现在,我们需要一种解析JSON字符串的方法。由于Scala没有内置功能,因此我们将使用Play Framework。  首先,我们需要一个case类将json字符串解析为。
为简单起见,我们将使用从JSON字符串到JsonMessage的自动转换。要转换流中的元素,我们需要使用.map转换。map转换仅将单个元素作为输入并提供单个输出。我们还必须过滤未能解析的元素。过滤器仅保留满足特定条件的元素,然后获取其值。
Json.fromJson [JsonMessage](Json.parse(entry))).filter(_。isInstanceOf [JsSuccess [JsonMessage]]).map(_。get)
现在我们要丢弃状态代码错误的所有条目。这意味着我们需要再次过滤流。
要在控制台中打印消息,您可以将 它们打印() 。
现在,我们可以通过消息的ID来键入消息。我们将使用keyBy方法。
窗口化将流分成指定持续时间的片段。 我们将流分成10秒的窗口。
现在我们开始计算。首先-我们使用fold方法将单个键的元素合并到List中,初始值为空List。fold函数只是返回添加了另一个元素的新List。
现在我们需要计算每个传感器经过的距离。我们将使用从RosettaCode借来的Haversine公式,并对其 进行一些修改以计算两点之间的距离。我们还需要一个函数,用于计算点对之间的距离之和。我们将其称为reduceListOfJson。
半正矢(elem._1,elem._2))//计算距离之间的两个点.fold(OD)(_ + _)的列表Tuple2(数据(0 //总和元素).ID,距离)}
VAL减少=折叠.map(reduceListOfJson(_))
减少.print()。setParallelism(1)
现在,我们可以将结果写入另一个Kafka主题。因此,我们需要一个生产者。
现在,我们需要将过滤后的案例类转换回JSON。
Json.toJson(elem)).map(Json.stringify(_))
现在,我们可以为数据添加一个接收器。
现在,我们终于可以执行我们的代码了。
我们将按照  Kafka QuickStart文档中所述的先进行的设置来启动Kafka 。
终端命令:
sbtrun
./bin/zookeeper-server-start.sh /config/zookeeper.properties./bin/kafka-server-start /config/server.properties
bin \ windows \ zookeeper-server-start.bat config \ zookeeper.properties
bin \ Windows \ kafka-server-start.bat config \ server.properties
我们将直接在SBT(Scala构建工具)中运行该示例。
终端命令:
输出看起来像这样-(id,distance_passed):
[info](93
[信息](92
[信息](66
[信息](50
[信息](40
[信息](33
[信息](30
[信息](28
[信息](24
[信息](14
Python Consumer接收的数据如下所示:
ConsumerRecord(topic ='distances',partition = 0,offset = 30411,timestamp = 1524566590000,timestamp_type = 0,key = None,value = b'[9
完整的代码
haversine(elem._1,elem._2))//计算两点之间的距离.fold(0D)(_ + _)//列表Tuple2(data(0).id,distance)的
  总和} 案例类JsonMessage( lat:double,long:Double,status:字符串,id:Int)
  隐式val jsonMessageReads = Json.reads [JsonMessage]
  def main(args:Array [String]){
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaProperties = new Properties()
    kafkaProperties.setProperty(“ bootstrap.servers”,“ localhost:9092”)
    val kafkaConsumer = new FlinkKafkaConsumer010(
      “ sensors”,
      new SimpleStringSchema,
      kafkaProperties
    )
    val kafkaStream = env.addSource(kafkaConsumer)
    val jsonStream = kafkaStream
      .map(entry => Json.fromJson [JsonMessage](Json.parse(entry))).
      filter(_。isInstanceOf [JsSuccess [JsonMessage]])
      . map (_。get)
    valfilteredJsonStream = jsonStream。 filter(_。status ==“ ok”)
    val keyedJson = FilteredJsonStream.keyBy(_。id)
    val keyedWindowed = keyedJson.timeWindow(Time.seconds(10))
    valfolded = keyedWindowed.fold(ini??tialValue = List [JsonMessage]( ))(_:+ _).filter(_。nonEmpty)
    val减少=折叠.map(reduceListOfJson(_))
    减少.print()。setParallelism(1)
    val kafkaProducer =新的FlinkKafkaProducer010 [String](
      “距离”
      新的SimpleStringSchema,
      kafkaProperties
    )
    reduce.map(elem => Json.toJson(elem))。map(Json.stringify(_))。addSink(kafkaProducer)
    env.execute(“ KafkaExample”)
  }
}
Python Kafka制作人
导入json 导入请求导入时间导入numpy 类message():    def __init __(self,identifier):        self.long = numpy.random.uniform(-180.0,180.0)        self.lat = numpy.random.uniform(-90.0,90.0 )        self.id =标识符    def next_point(self):                 返回{#新点将以            “ long” 开头:self.long + numpy.random.uniform(-0.001,0.001),            “ lat”:self.lat + numpy.random.uniform(-0.001,0.001),            “状态”:numpy.random.choice(['ok','failed'],p =(            0.9,0.1 )),“ id”:self.id                     }     def __repr __(自己):        返回str({
            “ long”:self.long,            “ lat”:self.lat,            “ id”:self.id         })messages = [message(i)for range(0,500             )中的i]#创建500条消息producer = KafkaProducer(    bootstrap_servers = [“本地主机:9092”],    value_serializer = lambda v:json.dumps(v).encode('utf-8')),而True:    对于消息中的x:        producer.send('sensors',x.next_point ())    time.sleep(1)
Python Kafka使用者
消费者=消费者中味精的KafkaConsumer('geopoints_clean'):    print(msg)
结论
在本文中,我们简要介绍了Apache Flink是什么。我们探索了一个示例管道,其中包括从Apache Kafka获取数据,执行一些数据清理和聚合,然后将处理后的数据发送到另一个Kafka主题。

关注 CDA人工智能学院 ,回复“录播”获取更多人工智能精选直播视频!


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

相关推荐
栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群