NIFI是可以部署成集群的,在多台机器上分布式部署提高数据吞吐能力。本文第五章,通过源码,来梳理NIFI的分布式如何实现。在此之前,我们先来学习分布式系统中很重要的一种协议
Two-phase Commit(2PC)两段式提交协议是一种分布式一致性(consensus)协议,常被用于分布式系统中,用来保证分布式事务的原子性(atomic),即分布式事务的所有参与者,要么都提交,要么都回滚,不会存在一部分参与者提交了,而另外的参与者回滚的情况。
数据的修改请求被分为两个阶段
分为两种情况:
Commit情况:
Rollback情况:
在上一节 NIFI的 WAL 日志部分有提到,NIFI的数据,分为两部分,一个是在界面上可操作的配置信息部分,比如添加的处理器,新增的处理器组,两个处理器之间新增的连接,处理器的启停,在分布式情况下,肯定是需要在一个节点上的修改,能及时同步到其他节点。
另外一个就是flowFile部分,比如Web接口接到XML之后,拆分成了很多条。这部分要实现的是在一个节点的处理器生成的流文件,采用分布式负载均衡后,其他节点能分担某个节点的任务压力。
所以第五章也分为两部分来说。
还是以处理器的启动为例,在本系列的第一篇文章中,就已经讲到了启动处理器的入口:

在NIFI的源文件中按照上述路径,就能找到,除去前部处理异常的代码,剩下的如下:

第546行有一个判断,点进去可以看到:

它是用来检查当前的请求是否应该被复制到集群中,当前的NIFI实例不属于集群中的节点或者没有与集群建立连接的时候,返回false. 前一个是通过配置判断,后一个则是查看有没有集群协调器或者有没有与集群协调器进行通信。然后是检查请求头中的的 X-Request-Replicated 是否设置.
在最开始启动某个处理器的时候,上述方法返回true,才能实现将节点的启动信息,通过集群告知其他节点一起启动。也就是说,对NIFI界面上配置的修改,应该是在操作的时候,同步到集群其他节点的。我们继续跟进到547行的方法,会来到这里:

红框这里,是判断是把请求只给到集群的协调器,还是直接给到每一个节点。


如果当前节点,已经被选做是集群的协调器, ReplicationTarget 返回 CLUSTER_NODES,代码上边,否则走下边。简单理解就是说,如果当前节点已经是协调器了,就可以直接把请求,给下边的参与节点了,否则需要把请求发给协调器,协调器再给到各个节点。
我们先跟到1102行进去,会来到这里,注意看类名:

这里173的判断除了抛出异常,没有做实质性的工作,跳过去,来到最后:

获取到所有处于连接状态的节点,

加锁,防止不同的请求在同一时间,修改流,同时注意280行,在请求中加了个属性REPLICATION_INDICATOR_HEADER。还记得前边的判断吧。通过这个字段,NIFI节点就知道,当前的请求,是来自于集群协调器的,不必再进行转发。
最后来到:

将给节点发送请求打成任务丢入线程池,完成对所有节点的请求。
回到刚刚的分支,当需要给协调器发消息,需要找到协调器的位置,然后发送消息。不再赘述
所以通过上边的代码整理可以看到:在集群模式下,当在一个NIFI节点的界面上,对流程进行修改的时候,该节点在后端,会先判断当前节点是否被选做了集群的协调器。如果是,则通过协调器就能将修改请求,发送到各个节点。如果不是,则当前节点知道协调器的位置,将修改请求转给协调器节点,然后重复一。协调器发出来的请求,在请求头中会带有标识 REPLICATION_INDICATOR_HEADER。各其他节点如果从请求中拿到的该字段有值,则不再对请求进行转发,转而执行该请求内容。
熟悉分布式的同学,会觉得博主的文档写的比较啰嗦,其实分布式这块儿,博主也是刚接触,很多概念也并不清楚,看代码看到了才会知道原来是这么回事。所以这个博客,也算是博主本人学习过程的一个记录。
我们回到下面这个给所有节点发送请求的方法,来看具体的实现:

方法比较长,我们一步步看,先是验证节点存在,并且是处于连接中的状态。

这里定义了两个回调,然后在构造响应 response 的时候传了进去。

这里根据注释,当请求是可修改请求的时候,并且performVerification(需要进行检验的时候)需要进行两段式的提交,让各个节点去检验请求能否被执行,所有节点都返回可以执行,再通知各个节点真正去执行。所有的操作都在performVerification方法中处理。点进去:

这里重温下 Function 的用法,其实是定义了一个方法,这个方法的参数是 NodeIdentifier类型,返回值是 NodeHttpRequest。注意这里只是定义,只有调用apply方法的时候,才会真正地放回这个对象。我们进入623行

这里就调用了apply方法,返回了 NodeHttpRequest 对象,这个对象实现了 Runnable 方法,因此可以被加入到线程池当中。我们进到run方法看这个线程会干嘛:


实际地发送请求。在run方法执行完之后,调用了回调函数的 onCompletion 方法。上边的代码中我把回调折叠了,我们现在来回头看回调函数是怎么定义的。

把所有的响应,都放在集合中,并判断当前集合的大小和应该发送的请求数是否相等。相等的话说明所有节点都已经给了响应。当已经收到所有节点都响应202.这时候两段式提交的第一阶段已经完成,并且所有节点返回了OK。

可以看到又回到了本节一开始部分的代码。 并且注意到 performVerification 传的是false.

再次回到这里,就要走448的代码。

然后是第二阶段的请求提交,跟第一阶段的代码是一样的,只不过是请求头不一样了。
至此2阶段提交请求的发送流程已经都走完了。
响应我们还回到处理器启动接口,进入 withWriteLock 方法:

判断如果是两段式的请求,
如果是第一阶段的验证阶段,先把请求记录下来,然后响应202
接到第二阶段的请求,如果是正式的执行请求,就开始正式执行。如果是撤销请求,则撤销事务然后返回响应。
这里结合官网图,就能清楚zookeeper,进一步说明一下。

图上有行小字,客户端与接口的交互可以可能在任何一个节点上,就能理解上边提到的为什么要判断当前节点是不是协调器。能理解对其中一个节点的操作能通过协调器转发到集群其他节点,并通过分布式一致性协议(2pc)保持各个节点的配置都是相同的
我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称
最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg