全部版块 我的主页
论坛 数据科学与人工智能 大数据分析 Hadoop论坛
1054 1
2024-10-18
一、Flink SQL Gateway概述
Flink Sql Gateway是Flink集群的“任务网关”,支持以rest api的形式提交查询、插入、删除等任务。
Sql Gateway其实内部说白了就是一个SessionManager会话管理器以及一个SqlGatewayEndpoint网络服务器。
其中SqlGatewayEndpoint是基于runtime.rest.RestServerEndpoint实现的Netty服务器,通过统一的架构设计,实现了多种handler,如创建会话、提交任务、任务状态查询、任务取消、拉取数据等。所有handler使用公用的SessionManager进行会话管理。SessionManager中维护了一个sessionId和Session的Map,考虑到并发问题,底层采用ConcurrentHashMap作为并发存储。

二、Flink chcekpoint作为flink中最重要的部分,是flink精准一次性的重要保证,可以这么说flink之所以这么成功和她的checkpoint机制是离不开的。

之前大概学习了一下flink的checkpoint源码,但是还是有点晕乎乎的,甚至有点不理解我们作业中设置的checkpoint配置flink是如何读取到的,并且他是如何往下传播的。

1.代码中的checkpoint配置
这次我详细屡了一下,方便我们更好理解checkpoint,下面我们先看代码中我们一般是如何配置flink checkpoint的:
// TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );

        中间的业务代码省略

        // TODO 3. 启动任务
        env.execute();

上面这个是官网的SocketWindowWordCount程序示例,它首先从命令行中获取socket连接的host和port,然后获取执行环境、从socket连接中读取数据、解析和转换数据,最后输出结果数据。
每个Flink程序都包含以下几个相同的基本部分:
获得一个execution environment,
加载/创建初始数据,
指定此数据的转换,
指定放置计算结果的位置,
触发程序执行

Configuration作为配置容器,几乎所有的构建需要从配置类获取配置项,这里不显示关联关系

1. 用户命令行执行kubernates-session.sh,主入口是KubernetesSessionCli main

2.ClusterClientServiceLoader SPI机制载入ClusterClientFactory,k8s环境下实现类是KubernetesClusterClientFactory

3. ClusterClientFactory是ClusterClient工厂,首先创建ClusterDescriptor集群描述,该类负责部署集群,最终返回ClusterClient,k8s环境下实现类是
KubernetesClusterDescriptor

4.KubernetesClusterDescriptor新建集群规格ClusterSpecification,该类不是针对k8s,定义了flink master和任务管理器的内存容量等技术参数,通用于容器类的集群;
KubernetesSessionClusterEntrypoint类型设置为ENTRY_POINT_CLASS参数,后面《部署集群》使用到

5.KubernetesClusterDescriptor的deploySessionCluster部署k8s集群,参看《部署集群》用例,最后返回ClusterClient,用于后续提交作业和其他集群操作

三、累加器(Accumulator)
累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。
Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合并最终结果:
add(V value):执行加法运算,将值 V 累加到当前 UDF 的累加器中
merge(Accumulator<V, R> other):执行合并操作,将累加器 other 与当前累加器合并
累加器的使用方法如下:
Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)
private IntCounter numLines = new IntCounter();
1
Step 2|在富函数的 open() 方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果
getRuntimeContext().addAccumulator("num-lines", this.numLines);
1
Step 3|在 UDF 的任何地方(包括 open() 和 close() 方法中)使用累加器
this.numLines.add(1);
1
Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
1
单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。

四、编译源码
执行如下命令:
mvn clean install -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true
-Dmaven.test.skip:跳过测试代码
-Dmaven.javadoc.skip:跳过 javadoc 检查
-Dcheckstyle.skip:跳过代码风格检查
maven 编译的时候跳过这些检查,这样可以减少很多时间,还可能会减少错误的发生。

五、flink的图结构
在Yarn模式下不使用StreamGraph,而是用OptimizedPlan生成JobGraph。StreamGraph适用于其他情况,比如本地执行。
flink的图结构主要有JobGraph和ExecutionGraph。
JobGraph
JobGraph表示一个被 JobManager 接收的底层的Flink dataflow program。所有上层API代码都会转化为JobGraphs。抽象来说,JobGraph是一张由 vertices 和 intermediate results 组成的DAG图。现在 iterations (feedback edges)已经不会被编译到 JobGraph 了,而是去到了一些建立了反馈管道的 vertices 中。JobGraph规定了 job 层面上的配置,而其所包含的 vertex 和 intermediate result 定义了具体算子的特征和中间结果。
ExecutionGraph
协调数据流的分布式执行的核心数据结构。它保持每个并行任务,每个中间流以及它们之间的通信的表示。它主要由以下三个部分组成:
ExecutionJobVertex:对应 JobGraph 的 vertex,通常是一个算子,如map、join。它持有一组并行子任务的聚合状态aggregated state。由 JobVertex 确定。
ExecutionVertex:表示一组并行任务中的其中一个子任务。数量由并行度而定。由 ExecutionJobVertex 和
Execution:执行 ExecutionVertex ,一个 ExecutionVertex 可能有多个 Execution 来应对失败或重新计算。由 ExecutionAttemptID 确定。JM 和 TM 之间关于 task 的部署和更新都是根据 ExecutionAttemptID 来通知。
operator算子:一般operator的操作是通过反射获取所传入的function的返回对象,通过transform创建经过该function处理后得到的流实例。在返回生成的流实例之前,flink还会对转换进行登记,即.addOperator(resultTransform)。
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

全部回复
2024-10-18 17:55:07
学习地址:https://pan.baidu.com/s/1wpgRUxmQXOp46GoyitZ21w 提取码: p2sm
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

相关推荐
栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群