
处理数据

旁路输出

只有在特定的函数中才能使用旁路输出,具体如下
1)ProcessFunction。
2)KeyedProcessFunction。
3)CoProcessFunction。
4)ProcessWindowFunction。
5)ProcessAllWindowFunction。
6)ProcessJoinFunction。
7)KeyedCoProcessFunction。


.....
public class RuntimeEnvironment implements Environment {
private final JobID jobId;
private final JobVertexID jobVertexId;
private final ExecutionAttemptID executionId;
private final TaskInfo taskInfo;
private final Configuration jobConfiguration;
private final Configuration taskConfiguration;
private final ExecutionConfig executionConfig;
private final UserCodeClassLoader userCodeClassLoader;
private final MemoryManager memManager;
private final IOManager ioManager;
private final BroadcastVariableManager bcVarManager;
private final TaskStateManager taskStateManager;
private final GlobalAggregateManager aggregateManager;
private final InputSplitProvider splitProvider;
private final ExternalResourceInfoProvider externalResourceInfoProvider;
private final Map<String, Future<Path>> distCacheEntries;
private final ResultPartitionWriter[] writers;
private final IndexedInputGate[] inputGates;
private final TaskEventDispatcher taskEventDispatcher;
private final CheckpointResponder checkpointResponder;
private final TaskOperatorEventGateway operatorEventGateway;
private final AccumulatorRegistry accumulatorRegistry;
private final TaskKvStateRegistry kvStateRegistry;
private final TaskManagerRuntimeInfo taskManagerInfo;
private final TaskMetricGroup metrics;
private final Task containingTask;
RuntimeContext是Function运行时的上下文,封装了Function运行时可能需要的所有信息,让Function在运行时能够获取到作业级别的信息,如并行度相关信息、Task名称、执行配置信息(ExecutionConfig)、State等

物理Transformation

物理Transformation一共有4种,具体如下 SourceTransformation,SinkTransformation,OneInputTransformation,TwoInputTransformation
所有的算子都包含了生命周期管理、状态与容错管理、数据处理3个方面的关键行为。
生命周期管理
1)setup:初始化环境、时间服务、注册监控等。
2)open:该行为由各个具体的算子负责实现,包含了算子的初始化逻辑,如状态初始化等。算子执行该方法之后,才会执行Function进行数据的处理。
3)close:所有的数据处理完毕之后关闭算子,此时需要确保将所有的缓存数据向下游发送。
4)dispose:该方法在算子生命周期的最后阶段执行,此时算子已经关闭,停止处理数据,进行资源的释放。
状态与容错管理
算子负责状态管理,提供状态存储,触发检查点的时候,保存状态快照,并且将快照异步保存到外部的分布式存储。当作业失败的时候算子负责从保存的快照中恢复状态
数据处理
算子对数据的处理,不仅会进行数据记录的处理,同时也会提供对Watermark和LatencyMarker的处理。算子按照单流输入和双流输入,定义了不同的行为接口
函数层次
UDF在DataStream API层使用,Flink提供的函数体系从接口的层级来看,从高阶Function到低阶Function

RichFunction相比无状态Function,有两方面的增强:
1)增加了open和close方法来管理Function的生命周期,在作业启动时,Function在open方法中执行初始化,在Function停止时,在close方法中执行清理,释放占用的资源等。无状态Function不具备此能力。
2)增加了getRuntimeContext和setRuntimeContext。通过RuntimeContext,RichFunction能够获取到执行时作业级别的参数信息,而无状态Function不具备此能力。
无状态Function天然是容错的,作业失败之后,重新执行即可,但是有状态的Function(RichFunction)需要处理中间结果的保存和恢复,待有了状态的访问能力,也就意味着Function是可以容错的,执行过程中,状态会进行快照然后备份,在作业失败,Function能够从快照中恢复回来。
处理函数
ProcessFunction:单流输入函数。CoProcessFunction:双流输入函数。KeyedProcessFunction:单流输入函数。KeyedCoProcessFunction:双流输入函数。

广播函数

异步函数
数据源函数

输出函数
检查点函数

连接器在Flink中叫作Connector。Flink本身是计算引擎,并不提供数据存储能力,所以需要访问外部数据,外部数据源类型繁多,连接器因此应运而生,它提供了从数据源读取数据和写入数据的能力。基于SourceFunction和SinkFunction构建出了种类繁多的连接器
在分布式计算中,Flink对所有需要进行唯一标识的组件、对象提供了抽象类AbstractID,因为需要跨网络进行传递,所以该类实现了Serializable接口,需要比较唯一标识是否相同,所以也实现了Comparable接口

如果想了解流处理系统中如何实现强一致性,可以参考MillWheel:Fault-Tolerant Stream Processing at Internet Scale和Discretized Streams:Fault-Tolerant Streaming Computation at Scale两篇论文

WindowAssigner


WindowTrigger
Trigger触发器决定了一个窗口何时能够被计算或清除,每一个窗口都拥有一个属于自己的Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入该窗口,或者之前注册的定时器超时时,Trigger都会被调用。Trigger触发的结果如下。
1)Continue:继续,不做任何操作。
2)Fire:触发计算,处理窗口数据。
3)Purge:触发清理,移除窗口和窗口中的数据。
4)Fire + Purge:触发计算+清理,处理数据并移除窗口和窗口中的数据。
WindowEvictor
1)CountEvictor: 计数过滤器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
2)DeltaEvictor: 阈值过滤器。本质上来说就是一个自定义规则,计算窗口中每个数据记录,然后与一个事先定义好的阈值做比较,丢弃超过阈值的数据记录。
3)TimeEvictor: 时间过滤器。保留Window中最近一段时间内的元素,并丢弃其余元素。
Window函数
1)增量计算函数
2)全量计算函数
1) AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-1,此处减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃。
2)BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-固定延迟。
(2)每事件Watermark策略每事件Watermark策略在Flink中叫作PuntuatedWatamarkAssigner,数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上会对下游算子造成压力
(3)无为策略无为策略在Flink中叫作PreserveWatermark。在Flink中可以使用DataStream API和Table & SQL混合编程,所以Flink SQL中不设定Watermark策略,使用底层DataStream中的Watermark策略

Flink内部自主进行内存管理,将数据以二进制结构保存在内存中,目前的实现中大量使用了堆外内存。如果让开发人员直接操作二进制结构,代码会变得复杂臃肿,所以大数据平台在设计API的时候,允许用户直接像编写普通Java应用程序一样使用其API开发Function,直接使用JDK提供的类型和自定义类型。



内存布局
TaskManager是Flink中执行计算的核心组件,是用来运行用户代码的Java进程。其中大量使用了堆外内存。

(1)ValueState<T>即类型为T的单值状态。这个状态与对应的Key绑定,是最简单的状态。可以通过update方法更新状态值,通过value()方法获取状态值。
(2)ListState<T>即Key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
(3)ReducingState<T>这种状态通过用户传入的reduceFunction,每次调用add方法添加值时,会调用reduceFunction,最后合并到一个单一的状态值。
(4)AggregatingState<IN,OUT>聚合State,和(3)不同的是,这里聚合的类型可以是不同的元素类型,使用add(IN)来加入元素,并使用AggregateFunction函数计算聚合结果。
(5)MapState<UK,UV>使用Map存储Key-Value对,通过put(UK,UV)或者putAll(Map<UK,UV>)来添加,使用get(UK)来获取。
按照由Flink管理还是用户自行管理,状态可以分为原始状态(Raw State)和托管状态(Managed State)。原始状态,即用户自定义的State,Flink在做快照的时候,把整个State当作一个整体,需要开发者自己管理,使用byte数组来读写状态内容。托管状态是由Flink框架管理的State,如ValueState、ListState、MapState等,其序列化与反序列化由Flink框架提供支持,无须用户感知、干预。KeyedState和OperatorState可以是原始状态,也可以是托管状态。通常在DataStream上的状态推荐使用托管状态,一般情况下,在实现自定义算子时,才会使用到原始状态。


在图可以看到,业务数据流是一个普通数据流,规则数据流是广播数据流,这样就可以满足实时性、规则更新的要求。规则算子将规则缓存在本地内存中,在业务数据流记录到来时,能够使用规则处理数据。



资源管理器在Flink中叫作ResourceManager。Flink同时支持不同的资源集群类型,ResourceManager位于Flink和资源管理集群(Yarn、K8s等)之间,是Flink集群级资源管理的抽象,其主要作用如下
1)申请容器启动新的TM,或者为作业申请Slot。
2)处理JobManager和TaskManager的异常退出。
3)缓存TaskManager(即容器),等待一段时间之后再释放掉不用的容器,避免资源反复地申请释放。
4)JobManager和TaskManager的心跳感知,对JobManager和TaskManger的退出进行对应的处理
Slot管理器在Flink中叫作SlotManager,是ResourceManager的组件,从全局角度维护当前有多少TaskManager、每个TaskManager有多少空闲的Slot和Slot等资源的使用情况。当Flink作业调度执行时,根据Slot分配策略为Task分配执行的位置
1)对TaskManager提供注册、取消注册、空闲退出等管理动作,注册则集群可用的Slot变多,取消注册、空闲推出则释放资源,还给资源管理集群。
2)对Flink作业,接收Slot的请求和释放、资源汇报等。当资源不足的时候,SlotManger将资源请求暂存在等待队列中,SlotManager通知ResourceManager去申请更多的资源,启动新的TaskManager,TaskManager注册到SlotManager之后,SlotManager就有可用的新资源了,从等待队列中依次分配资源。
SlotProvider接口定义了Slot的请求行为,支持两种请求模式。
1)立即响应模式:Slot请求会立即执行。
2)排队模式:排队等待可用Slot,当资源可用时分配资源
Slot资源池在Flink中叫作SlotPool,是JobMaster中记录当前作业从TaskManager获取的Slot的集合。
预提交阶段

这种方式只能在数据源Kafka到Flink内部保证严格一次,一旦涉及从Sink写入到外部Kafka就会出现问题了。假设Checkpoint 3完成之后,Source从Topic偏移量位置65536读取了1000条数据,Topic偏移量为66536,Sink写入了1000条数据到外部Kafka,此时Flink应用的1个Sink并行实例因为未处理的异常崩溃,进入Failover阶段,应用自动从Checkpoint 3恢复,重新从Topic的偏移量65536开始读取数据,这就会导致65536~66536之间的1000条数据被重复处理,写入到了Kafka中。这种情况下需要避免重复写入这1000条数据到Kafka中。幂等性是一种解决方案,如对HBase按照主键插入可能有效,第2次插入是对第1次的更新。

提交阶段

在预提交阶段,数据实际上已经写入外部存储,但是因为事务的原因是不可读的,所以Sink在事务提交阶段的工作稍微简单一点,当所有的Sink实例提交成功之后,一旦预提交完成,必须确保提交外部事务也要成功,此时算子和外部系统协同来保证。倘若提交外部事务失败(如网络故障等),Flink应用就会崩溃,然后根据用户重启策略进行回滚,回滚到预提交时的状态,之后再次重试提交。

会话窗口不同于事件窗口,它的切分依赖于事件的行为,而不是时间序列,所以在很多情况下会因为事件乱序使得原本相互独立的窗口因为新事件的到来导致窗口重叠,而必须要进行窗口的合并???(过程)
matlab打开matlab,用最简单的imread方法读取一个图像clcclearimg_h=imread('hua.jpg');返回一个数组(矩阵),往往是a*b*cunit8类型解释一下这个三维数组的意思,行数、数和层数,unit8:指数据类型,无符号八位整形,可理解为0~2^8的数三个层数分别代表RGB三个通道图像rgb最常用的是24-位实现方法,即RGB每个通道有256色阶(2^8)。基于这样的24-位RGB模型的色彩空间可以表现256×256×256≈1670万色当imshow传入了一个二维数组,它将以灰度方式绘制;可以把图像拆分为rgb三层,可以以灰度的方式观察它figure(1
我正在学习Ruby,遇到了inject。我正处于理解它的风口浪尖,但当我是那种需要真实世界的例子来学习一些东西的人时。我遇到的最常见的例子是人们使用inject来添加一个(1..10)范围的总和,我不太关心这个。这是一个任意的例子。在实际程序中我会用它做什么?我正在学习,所以我可以继续使用Rails,但我不必有一个以Web为中心的示例。我只需要一些我可以全神贯注的目标。谢谢大家。 最佳答案 inject有时可以通过它的“其他”名称reduce更好地理解。它是一个对Enumerable进行操作(迭代一次)并返回单个值的函数。它有许多有
3月26日,映宇宙(HK:03700,即“映客”)发布截至2022年12月31日的2022年度业绩财务报告。财报显示,映宇宙2022年的总营收为63.19亿元,较2021年同期的91.76亿元下降31.1%。2022年,映宇宙的经营亏损为4698.7万元,2021年同期则为净利润4.57亿元;期内亏损(净亏损)为1.68亿元,2021年同期的净利润为4.33亿元;非国际财务报告准则经调整净利润为3.88亿元,2021年同期为4.82亿元,同比下降19.6%。 映宇宙在财报中表示,收入减少主要是由于行业竞争加剧,该集团对旗下产品采取更为谨慎的运营策略以应对市场变化。不过,映宇宙的毛利率则有所提升
我正在使用SublimeText2,同时遵循MichaelHartl的RubyonRails教程。可以在http://ruby.railstutorial.org/book/ruby-on-rails-tutorial找到我所指的教程的具体部分。(ctrl+F“list5.26”)。我能够创建规范/支持文件。但是,在尝试创建spec/support/utilities.rb文件时,我收到消息“无法保存~/rails_projects/sample_app/spec/support/utilities.rb”。有人知道为什么会这样吗?SublimeText论坛上有人似乎遇到了完全相同的问
整理|王启隆透过「历史上的今天」,从过去看未来,从现在亦可以改变未来。今天是2023年4月26日,在2017年的今天,中国首艘国产001A型航空母舰在大连完成了下水,从开工到下水,历时3年多时间。回首过去,眺望未来,在科技历史上的每个4月26日里,还发生过哪些影响深远的关键事件呢?1938年4月26日:编程校验领域图灵奖得主ManuelBlum出生曼纽尔·布卢姆(ManuelBlum)出生于1938年4月26日,他是委内瑞拉的计算机科学家、卡内基梅隆大学的教授,因对计算复杂度理论做出的贡献,以及在密码学和编程校验上的应用而获1995年图灵奖。布卢姆出生于委内瑞拉的一个犹太家庭,他曾在麻省理工学
集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar
如何用IDEA2022创建并初始化一个SpringBoot项目?目录如何用IDEA2022创建并初始化一个SpringBoot项目?0. 环境说明1. 创建SpringBoot项目 2.编写初始化代码0. 环境说明IDEA2022.3.1JDK1.8SpringBoot1. 创建SpringBoot项目 打开IDEA,选择NewProject创建项目。 填写项目名称、项目构建方式、jdk版本,按需要修改项目文件路径等信息。 选择springboot版本以及需要的包,此处只选择了springweb。 此处需特别注意,若你使用的是jdk1
我现在正在努力学习Ruby和RubyonRails。我正在学习LearningRails,第1版,但我很难理解其中的一些代码。我通常使用C、C++或Java工作,因此Ruby对我来说是一个很大的改变。我目前对数据库迁移器的以下代码块感到困惑:defself.upcreate_table:entriesdo|t|t.string:namet.timestampsendendt变量来自哪里?它实际上代表什么?它有点像for(i=0;i另外,:entries是在什么地方定义的?(entries是我的Controller的名称,但是这个函数怎么知道的?) 最佳答案
文章目录问题B:芝华士威士忌和他的小猫咪们代码&注释问题C:愿我的弹雨能熄灭你们的痛苦代码注释问题D:猜糖果游戏代码注释问题E:有趣的次方代码注释问题F:这是一个简单题代码&注释问题G:打印矩阵代码注释问题H:scz的简单考验代码注释问题I:完美区间代码&注释问题J:是狂热的小迷妹一枚吖~代码&注释2022年10月23日周赛ZZULIOJ问题B:芝华士威士忌和他的小猫咪们时间限制:1Sec内存限制:128MB题目描述芝华士威士忌很喜欢带着他的猫咪们一块跑着玩。但是小猫咪们很懒,只有在离他y米以内才愿意和他一块跑。这天他在坐标为x的位置,他想和他的猫咪们一块跑着玩。有n个小猫咪,第i个小猫咪在坐
代码请进行一定修改后使用,本代码保证100%通过率,本题目提供了java、python、c++三种代码。复盘思路在文章的最后题目描述祖国西北部有一片大片荒地,其中零星的分布着一些湖泊,保护区,矿区;整体上常年光照良好,但是也有一些地区光照不太好。某电力公司希望在这里建设多个光伏电站,生产清洁能源对每平方公里的土地进行了发电评估,其中不能建设的区域发电量为0kw,可以发电的区域根据光照,地形等给出了每平方公里年发电量x千瓦。我们希望能够找到其中集中的矩形区域建设电站,能够获得良好的收益。输入描述第一行输入为调研的地区长,宽,以及准备建设的电站【长宽相等,为正方形】的边长最低要求的发电量之后每行为