1. Regular Join:灵活性最高但资源消耗较大的连接方式
Regular Join 是最常见的一种 Join 类型,语法形式如下:
SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id;
虽然在写法上与离线 SQL 几乎一致,但在 Flink 的流处理环境中,其执行机制存在显著差异。
1.1 流场景下 Regular Join 的性能开销
由于流式数据是无界的,Regular Join 需要满足这样的语义:
为实现这一目标,Flink 必须将两个参与 Join 的表的全部数据都保存在状态(State)中。这导致:
因此,官方文档多次提醒用户:必须根据业务逻辑合理设置 State TTL(Time-To-Live),否则极易因状态膨胀引发内存溢出(OOM)问题。
1.2 支持的等值连接类型
Flink 中的 Regular Join 当前支持以下几种连接模式:
但对 Join 条件有严格限制:
Orders.product_id = Product.id;a.col = b.col
a.x > b.y
常见的合法写法包括:
-- 内连接 SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id; -- 左外连接 SELECT * FROM Orders LEFT JOIN Product ON Orders.product_id = Product.id; -- 右外连接 SELECT * FROM Orders RIGHT JOIN Product ON Orders.product_id = Product.id; -- 全外连接 SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.product_id = Product.id;
1.3 多表连接建议使用 MultiJoin 优化
当涉及多个表进行链式连接时,若按顺序逐个执行 Join,容易造成中间状态急剧膨胀。
Flink 提供了 MultiJoin 算子来合并和优化多表连接操作。实践建议如下:
A JOIN B JOIN C JOIN D
2. Interval Join:基于时间窗口的事件关联机制
Interval Join 主要用于解决“在一定时间范围内关联两个事件”的需求。典型应用场景包括:
示例语句:
SELECT * FROM Orders o, Shipments s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time;
该查询的含义是:
[ship_time - 4 小时, ship_time]
2.1 Interval Join 的使用限制
相比 Regular Join,Interval Join 更加受限但也更高效:
合法的时间条件示例如下:
ltime = rtimeltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTEltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND得益于时间字段的单调递增特性(或结合 Watermark 处理乱序数据),Flink 可以:
简而言之,Interval Join 相当于一种带有时间边界约束的 Join 操作,在保障正确性的同时显著降低了状态存储的压力。
3. Temporal Join:实现事实流的“时点维度填充”
Temporal Join(时态连接)是构建 Flink 流式数据仓库的核心功能之一。其实质是:
在处理不断变化的维度表时,若需在特定时间点还原出“当时的快照版本”并将其与事实数据流进行关联(Join),这一需求在实际业务中十分普遍。例如:
同一条订单或行为记录,在当天查看和隔日查看时,所关联的维度信息可能存在差异——这正是时态 Join 发挥作用的场景。
Flink 提供了两种类型的时态 Join 操作,分别基于不同的时间语义:
一个典型的使用案例是“订单金额 + 汇率”转换:
订单金额应按照下单那一刻的汇率换算为美元(USD),而非采用后续更新后的最新汇率。
核心 SQL 示例(DDL 已省略):
SELECT order_id, price, orders.currency, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency;
语义说明:
order_timecurrency update_timeorder_time 去查找该时刻对应的汇率版本;注意事项:
orders.currency = currency_rates.currency这种 Join 方式更贴近“Lookup”语义:
每当一条事实数据进入流中,即刻使用此时维度表中的最新可用数据来填充相关信息。
典型语法示例:
SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
主要特点包括:
Lookup Join 可视为“处理时间时态 Join”与“外部维表连接器”的结合体。
常见应用场景包括:
DDL 定义示例:
CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers' );
查询语句:
SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
关键要素解析:
proc_timejdbcFOR SYSTEM_TIME AS OF o.proc_timeLookup Join 的典型特征:
在实际业务建模中,常出现“单行数据内嵌数组或 Map 结构”的情形,例如:
UNNEST 展示了如何将这类嵌套结构展开为扁平化的多行记录,UNNEST 正是解决此类问题的核心工具。
示例一:常量数组展开
SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY['shirt', 'pants', 'hat']);
示例二:基于表字段的数组展开
SELECT order_id, product_name FROM Orders
5.2 WITH ORDINALITY:同时获取元素下标
在使用 UNNEST 展开结构时,可以通过添加 WITH ORDINALITY 来一并返回元素的序号。例如:
SELECT *
FROM (VALUES ('order_1'), ('order_2'))
CROSS JOIN UNNEST(ARRAY['shirt', 'pants', 'hat'])
WITH ORDINALITY AS t(product_name, index);
需要注意的是:
以 Map 为例,进行展开操作:
SELECT *
FROM
(VALUES('order_1'))
CROSS JOIN UNNEST(MAP['shirt', 2, 'pants', 1, 'hat', 1]) WITH ORDINALITY;
当处理 multiset 时,若某元素的出现次数(multiplicity)为 2,则会展开为两行相同记录。
index
表函数 Join 结合 LATERAL 关键字,其核心语义是:
将左表的每一行作为输入参数,调用一次表函数,将其返回的多行结果与该行进行连接。
LATERAL TABLE
如果表函数执行后未返回任何结果,则对应的左表行将被过滤掉:
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res);
即使表函数无返回结果,左表记录依然保留,右侧字段填充 NULL:
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE;
NULL
常见应用场景包括:
根据不同的业务需求,应选用最匹配的 Join 方式。以下是典型场景与推荐方案对照:
| 场景诉求 | 推荐 Join 类型 |
|---|---|
| 任意两个动态表全量关联,语义最贴近离线 SQL | Regular Join |
| 两个事件流基于 ID 和时间区间进行匹配 | Interval Join |
| 事实表回放某一时刻维度表的历史版本 | Event Time Temporal Join |
| 事实表实时补全当前最新的维度信息 | Processing Time Temporal Join / Lookup Join |
| 将数组、Map 或 multiset 展平为多行 | UNNEST +(可选)WITH ORDINALITY |
| 每行触发一个 UDTF 并将其多行结果 Join | LATERAL TABLE / 表函数 Join |
一条关键实战经验提醒:
Regular Join 和无限制的多表 Join 是生产环境中事故高发区。
在流式计算中必须重点考虑以下因素:
本文从语义逻辑、资源代价和典型应用三个维度,系统梳理了 Flink SQL 中各类 Join 的特性:
扫码加好友,拉您进群



收藏
