实战Spark3 实时处理,掌握两套企业级处理方案
如何避免写重复代码:善用抽象和组合?
通过抽象和组合,可以写出更简洁、易懂、稳定的代码;类似于金字塔建造过程,我们总是可以将一层抽象堆叠在另一层之上,从而实现我们的目标。但是在日常的开发工作中,我们是如何实践的呢?本文将以作者在Akka项目中的社区贡献作为引子,分享我的经验。
事件
通常,为了简化我们对数据流的处理,我们可能会使用Java8中首次引入的Stream,或者Kotlin和Scala等编程语言中提供的更丰富的set库,或者使用reactive flow的相关三方库来简化我们的工作。尽管这些类库已经提供了丰富的操作符,但我们仍然会遇到它们没有为某些场景提供合适的操作符的情况。例如:
在直播场景中,某些类型的消息需要缓冲和聚合。一段时间内的多个赞合并为一个赞,处理N条消息时发送整条消息,保证整体扩散水平维持在一个稳定的水平。
在IOT场景下,接收终端设备上报的数据,返回当前数据和前一个值,或者后三个值,计算变化趋势。此时,我们可以使用反应流库中提供的:zipWithNext、zipWithPrevious、zipWithPreviousAndNext或sliding。
在建立聊天室的时候,如果用户输入bye,用户就会断开连接离开聊天室,那么这个时候我们可能会使用takeWhile。
假设我们有一组SQL,我们需要依次执行它们,合并它们的结果,处理后关闭相应的数据库连接。此时,我们可能会使用mapWithResource,使用(资源安全)。
当处理文件、写入数据库等时。要使用资源,我们需要打开一个文件或者获取一个数据库连接,写入数据,然后在处理后关闭相应的资源。这时候我们可能会用到foldResource。
假设数据需要分批打包,每三个元素一个,此时我们可能使用batch(3)。
假设我们需要用每个元素的下标来组合元素。这个时候,我们可能需要使用zipWithIndex。
假设我们需要缓存元素,缓存到满足指定条件,我们可能需要bufferUntil(谓词),Buffer While(预测)●
假设我们需要缓存元素,直到数据发生变化,并将相同的项目合并在一起,我们可能需要bufferUtilChanged。
假设我们需要对所有元素进行重复数据删除,或者删除连续的重复元素,我们可能需要使用distinct和distinctUntilChanged。
假设只需要返回前N个元素,可能需要使用limit(N),take(N),或者根据takeWhile和takeUntil的条件假设需要跳过前N个元素。我们可能需要使用skip(N)、drop(N)或dropWhile和dropUntil。
等等...
正如我们所看到的,上述每个操作符都有特定的语义。虽然看起来只是一个简单的方法,但是如果需要完全独立实现的话,肯定会相当困难。例如,当前版本的Reactor-core中没有直接提供zipWithNext、ZipwithRevision、zipWithPreviousAndNext,而与资源相关的Reactor-core中只有一个using。
分析
作为程序员,第一件事肯定是Ctrl+C,第二件事是Ctrl+V,第三件事是Commit & Push。然而,事情并没有那么简单。
困难在于:
反应流操作者需要完全实现反应流的规范,并通过默认测试套件的验证。
操作符需要尽可能的抽象和可组合。
无论是单线程还是并发,都有正确的行为和语义,有完整的单元测试覆盖。
运算符需要以尽可能高效的性能来实现。
比如以zipWithIndex为例,在Reactor-core中有FluxIndexFuseable(370行代码)和FluxIndex (296行代码)两种实现。并且清楚的处理了各种情况。其他运营商也差不多:3.4.23版。
FluxBuffer—— 575行代码
xflow predict-464行代码
FluxDistinct—— 609行代码
FluxDistinctFuseable—— 70行代码
FluxDistinctUntilChanged—— 337行代码
FluxUsing—— 583行代码
如果您想要实现一个zipWithNext自定义操作符,您应该有一个类似的工作负载。这样的工作强度,我个人认为无论是在代码审核还是后期维护都是很大的问题。
所以我觉得需要一个新的抽象来进一步抽象上面的操作。然后,在此之上,通过使用和组合其他操作,更容易实现自定义操作符;
让我们想想如何实现这些操作符~ ~
解决办法
以上都可以抽象为:
状态和线程安全。
状态是可变的,根据不同的状态,对输入应用不同的操作,产生不同的值。
可以早点结束,也可以选择性丢弃不符合条件的值。
有完整的生命周期。
最后,可以根据内部状态生成可选值,而不会丢失内部状态。
经过分析,可以表示为:状态+输入-(应用行为)->新状态+输出,可以通过添加onCraete和onComplete生命周期函数来完整表示。而提前结束等行为可以通过take while来组合。
实现。我们的方法被命名为statefulMap,声明如下:
公共状态地图(
供应商创建,
java.util.function.BiFunction f,
Java . util . function . function on complete){...}
复制代码
让我们来看看如何通过这个方法实现zipWithIndex:
实现zipWithIndex(索引)
Source.from(Arrays.asList("A "," B "," C "," D "))
.州地图(
()-> 0L,
(index,element) -> Pair.create(index + 1,Pair.create(element,index))。
indexon complete-> optional . empty())
.runForeach(System.out::println,system);
//打印
// Pair(A,0)
// Pair(B,1)
// Pair(C,2)
// Pair(D,3)
复制代码
还可以实现zipWithNext、zipWithPreviousAndNext。让我们看看如何实现更复杂的bufferUntilChanged。
实现bufferUntilChanged
Source.from(Arrays.asList("A "、" B "、" B "、" C "、" C "、" D "))
.州地图(
()-->(List)new linked List(),
(缓冲区,元素)-> {
if (buffer.size() > 0 &&(!buffer.get(0)。等于(元素))){
返回Pair.create(
new linked list(collections . singleton list(element)),
collections . unmodifieblelist(buffer));
}否则{
buffer.add(元素);
返回Pair.create(buffer,collections . empty list());
}
},
可选::ofNullable)
.filterNot(List::isEmpty)
.runForeach(System.out::println,system);
//打印
// [A]
// [B,B]
// [C,C,C]
// [D]
复制代码
以此类推,如何实现distinctUntilChanged?
实现distinctUntilChanged
Source.from(Arrays.asList("A "、" B "、" B "、" C "、" C "、" D "))
.州地图(
可选::空,
(lastElement,element) -> {
if(last element . is present()& & last element . get()。等于(元素)){
return Pair.create(lastElement,optional . empty());
}否则{
return pair . create(optional . of(element),optional . of(element));
}
},
listOnComplete -> Optional.empty())
.via(Flow.flattenOptional())
.runForeach(System.out::println,system);
//打印
// A
// B
// C
// D
复制代码
如果要实现聚合缓冲区呢?
机具缓冲器
source . from javastream(()--> int stream . range closed(1,10))
.statefulMap( () -> new ArrayList(3),(List,element)-> { list . add(element);if(list . size()= = 3){ return pair . create(new ArrayList(3),collections . unmodifieblelist(list));} else { return Pair.create(list,collections . empty list());} },listOnComplete-> optional . of nullable(listOnComplete))。filterNot(List::isEmpty)
.runForeach(System.out::println,system);// printsList(1,2,3)List(4,5,6)List(7,8,9)List(10)
复制代码
更复杂的例子:处理资源
在前面了解了如何实现zipWithIndex和bufferUntilChanged之后,让我们进一步了解如何优雅而安全地处理资源。在任何编程语言和框架中,对资源的处理都是一件非常基本但棘手的事情。Java 7中首次引入了Try-with-resources语法,在一定程度上简化了资源处理。在反应流中我们应该如何操作?这里我们可以分为两种情况:
为流中的每个元素创建一个新资源,使用这个资源,然后关闭这个资源。
为整个流程创建一个资源,在处理流程中的每个元素时使用该资源,并在流程的生命周期结束后关闭该资源。
因为资源通常是昂贵的,需要妥善管理,所以在开发过程中,我们更容易遇到第二种情况,即资源的创建和销毁都与流的生命周期绑定在一起。关于反应式流程中的资源管理,需要考虑更多细节:
资源的初始化和关闭需要支持并发安全;反应式流可以被多次物化,被多个下游订阅者订阅和处理,以及以任何顺序取消订阅,因此需要在各种情况下(上游完成、下游取消、异常处理等)正确地创建和销毁资源。).
在流生命周期的所有阶段安全地创建和销毁资源;例如,即使在创建或销毁资源时触发了异常,同一资源也不会被关闭多次。
支持异步,提高资源使用效率。
它知道流的生命周期,支持向下游提供可选值,以在资源关闭时标记流的结束。例如,在处理文件时,使用一个特殊的标识符来标记文件的结尾。
在上面的例子中,我们通过statefulMap并结合其他操作符实现了很多与状态和生命周期相关的操作符,代码量大大减少。基于一个经过验证的操作符编写自定义操作符可以进一步降低错误的概率和代码审查的难度。相关的操作符都是通过底层的statefulMap实现的。映射到我们的工作中就是尽可能的抽象和细化,打磨系统的核心模型和核心功能,让每个应用都有一个精致的内核,与其他应用形成一个丰富的生态。而不是复制粘贴,反复做轮子;避免陷入复制粘贴的泥潭。虽然有时候我们可能没有足够的时间去进一步抽象,但是业务第一。但我还是建议在后续的实践中,进行不断的回顾和细化,在系统稳定可靠的情况下,逐步重构系统,使之更容易理解、维护和稳定。
我相信柴刀在设计、方案审核、测试、不断重构、提炼的过程中所花费的时间,在未来一定会有很多倍的回报。
实战Spark3 实时处理,掌握两套企业级处理方案
download链接:https://pan.baidu.com/s/1qbhp0Z7QMXpOBARRj0AEkA?pwd=xhu4
提取码:xhu4
--来自百度网盘超级会员V5的分享