10分钟内完成Apache Flink
什么是Flink?
Apache Flink是一个开源流处理框架。它被 Uber,ResearchGate,Zalando等许多公司广泛使用。从根本上讲,这一切都是关于处理来自外部源的流数据的。它可以与最新的消息传递框架(例如Apache Kafka,Apache NiFi,Amazon Kinesis Streams,RabbitMQ)一起运行。
总览
让我们探索一个使用Apache Flink进行流处理的简单Scala示例。我们将从JSON中提取来自Apache Kafka的传感器数据,对其进行解析,过滤,计算传感器在最近5秒钟内经过的距离,然后将处理后的数据发送回Kafka到另一个主题。
我们需要从Kafka获取数据-我们将创建一个简单的基于python的Kafka生产者。该代码在附录中。
版本:
Apache Kafka 1.1.0 ,Apache Flink 1.4.2 ,Python 3.6 ,Kafka-python 1.4.2 ,SBT 1.1.0
深入研究代码
现在,让我们从Flink程序的框架开始。每个Apache Flink程序都需要一个执行环境。在我们的例子中,它是一个 StreamExecutionEnvironment。
val env = StreamExecutionEnvironment.getExecutionEnvironment
然后,我们需要创建一个Kafka消费者。我们将使用 FlinkKafkaConsumer010 ,但它与我们正在使用的Kafka 1.1.0兼容。我们还将需要一个Properties 对象来设置引导服务器参数。
val kafkaProperties = new Properties()kafkaProperties.setProperty(“ bootstrap.servers”,“ localhost:9092”)val kafkaConsumer = new FlinkKafkaConsumer010(
“ sensors”,
new SimpleStringSchema,
kafkaProperties
)
现在,当我们有一个消费者时,我们将创建一个流。
val kafkaStream = env.addSource(kafkaConsumer)
现在,我们需要一种解析JSON字符串的方法。由于Scala没有内置功能,因此我们将使用Play Framework。 首先,我们需要一个case类将json字符串解析为。
为简单起见,我们将使用从JSON字符串到JsonMessage的自动转换。要转换流中的元素,我们需要使用.map转换。map转换仅将单个元素作为输入并提供单个输出。我们还必须过滤未能解析的元素。过滤器仅保留满足特定条件的元素,然后获取其值。
Json.fromJson [JsonMessage](Json.parse(entry))).filter(_。isInstanceOf [JsSuccess [JsonMessage]]).map(_。get)
现在我们要丢弃状态代码错误的所有条目。这意味着我们需要再次过滤流。
要在控制台中打印消息,您可以将 它们打印() 。
现在,我们可以通过消息的ID来键入消息。我们将使用keyBy方法。
窗口化将流分成指定持续时间的片段。 我们将流分成10秒的窗口。
现在我们开始计算。首先-我们使用fold方法将单个键的元素合并到List中,初始值为空List。fold函数只是返回添加了另一个元素的新List。
现在我们需要计算每个传感器经过的距离。我们将使用从RosettaCode借来的Haversine公式,并对其 进行一些修改以计算两点之间的距离。我们还需要一个函数,用于计算点对之间的距离之和。我们将其称为reduceListOfJson。
半正矢(elem._1,elem._2))//计算距离之间的两个点.fold(OD)(_ + _)的列表Tuple2(数据(0 //总和元素).ID,距离)}
VAL减少=折叠.map(reduceListOfJson(_))
减少.print()。setParallelism(1)
现在,我们可以将结果写入另一个Kafka主题。因此,我们需要一个生产者。
现在,我们需要将过滤后的案例类转换回JSON。
Json.toJson(elem)).map(Json.stringify(_))
现在,我们可以为数据添加一个接收器。
现在,我们终于可以执行我们的代码了。
我们将按照 Kafka QuickStart文档中所述的先进行的设置来启动Kafka 。
终端命令:
sbtrun
./bin/zookeeper-server-start.sh /config/zookeeper.properties./bin/kafka-server-start /config/server.properties
bin \ windows \ zookeeper-server-start.bat config \ zookeeper.properties
bin \ Windows \ kafka-server-start.bat config \ server.properties
我们将直接在SBT(Scala构建工具)中运行该示例。
终端命令:
输出看起来像这样-(id,distance_passed):
[info](93
[信息](92
[信息](66
[信息](50
[信息](40
[信息](33
[信息](30
[信息](28
[信息](24
[信息](14
Python Consumer接收的数据如下所示:
ConsumerRecord(topic ='distances',partition = 0,offset = 30411,timestamp = 1524566590000,timestamp_type = 0,key = None,value = b'[9
完整的代码
haversine(elem._1,elem._2))//计算两点之间的距离.fold(0D)(_ + _)//列表Tuple2(data(0).id,distance)的
总和} 案例类JsonMessage( lat:double,long:Double,status:字符串,id:Int)
隐式val jsonMessageReads = Json.reads [JsonMessage]
def main(args:Array [String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaProperties = new Properties()
kafkaProperties.setProperty(“ bootstrap.servers”,“ localhost:9092”)
val kafkaConsumer = new FlinkKafkaConsumer010(
“ sensors”,
new SimpleStringSchema,
kafkaProperties
)
val kafkaStream = env.addSource(kafkaConsumer)
val jsonStream = kafkaStream
.map(entry => Json.fromJson [JsonMessage](Json.parse(entry))).
filter(_。isInstanceOf [JsSuccess [JsonMessage]])
. map (_。get)
valfilteredJsonStream = jsonStream。 filter(_。status ==“ ok”)
val keyedJson = FilteredJsonStream.keyBy(_。id)
val keyedWindowed = keyedJson.timeWindow(Time.seconds(10))
valfolded = keyedWindowed.fold(ini??tialValue = List [JsonMessage]( ))(_:+ _).filter(_。nonEmpty)
val减少=折叠.map(reduceListOfJson(_))
减少.print()。setParallelism(1)
val kafkaProducer =新的FlinkKafkaProducer010 [String](
“距离”
新的SimpleStringSchema,
kafkaProperties
)
reduce.map(elem => Json.toJson(elem))。map(Json.stringify(_))。addSink(kafkaProducer)
env.execute(“ KafkaExample”)
}
}
Python Kafka制作人
导入json 导入请求导入时间导入numpy 类message(): def __init __(self,identifier): self.long = numpy.random.uniform(-180.0,180.0) self.lat = numpy.random.uniform(-90.0,90.0 ) self.id =标识符 def next_point(self): 返回{#新点将以 “ long” 开头:self.long + numpy.random.uniform(-0.001,0.001), “ lat”:self.lat + numpy.random.uniform(-0.001,0.001), “状态”:numpy.random.choice(['ok','failed'],p =( 0.9,0.1 )),“ id”:self.id } def __repr __(自己): 返回str({
“ long”:self.long, “ lat”:self.lat, “ id”:self.id })messages = [message(i)for range(0,500 )中的i]#创建500条消息producer = KafkaProducer( bootstrap_servers = [“本地主机:9092”], value_serializer = lambda v:json.dumps(v).encode('utf-8')),而True: 对于消息中的x: producer.send('sensors',x.next_point ()) time.sleep(1)
Python Kafka使用者
消费者=消费者中味精的KafkaConsumer('geopoints_clean'): print(msg)
结论
在本文中,我们简要介绍了Apache Flink是什么。我们探索了一个示例管道,其中包括从Apache Kafka获取数据,执行一些数据清理和聚合,然后将处理后的数据发送到另一个Kafka主题。

关注 CDA人工智能学院 ,回复“录播”获取更多人工智能精选直播视频!