jjzjj

数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配

袋鼠云数栈 2023-03-28 原文

一、JM启动过程

1、从日志角度分析启动流程

1)client生成jobGraph

详情请参考:
https://www.bilibili.com/video/BV13K4y1P7ri
2)Yarn RM接收到请求(和yarn交互不重点分析)

3)在被分配的节点上的工作目录下启动launch_container.sh

4)在perJob模式下,最终调用的是YarnJobClusterEntrypoint

5)初始化相关运行环境,打印软件版本、运行环境、命令行参数、classpath 等信息

6)加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等)

7)启动Flink资源管理核心组件ResourceManager(包含 YarnResourceManager 和 SlotManager 两个子组件)

8)启动Dispatcher加载JobGraph 文件、并启动JobManager

9)JobManager开始执行ExecutionGraph,向 ResourceManager申请资源

10)Flink ResourceManager 接收到新分配的 Container 资源后,准备好 TaskManager 启动上下文

11)TaskManager 进程加载并运行 YarnTaskExecutorRunner(Flink TaskManager入口类),初始化流程完成后启动 TaskExecutor(负责执行Task相关操作)

12)TaskExecutor向ResourceManager注册,向SlotManager汇报自己的 Slot 资源与状态

13)JobManager向TaskExecutor提交task,TaskExecutor启动新的线程运行Task

2、整体流程分析

1)输出各软件版本及运行环境信息、命令行参数项、classpath等信息
2)注册处理各种SIGNAL的handler:记录到日志
3)注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞
4)打印YARN运行环境信息:用户名
5)从运行目录中加载flink conf

3、AM启动过程

1)创建并启动各类内部服务(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)

2)将RPC address和port更新到flink conf配置

3)创建并启动resourceManager对象(Flink资源管理核心组件,包含YarnResourceManager和SlotManager两个子组件,YarnResourceManager负责外部资源管理,与YARN RM建立通信并保持心跳,申请或释放TaskManager资源,注销应用等;SlotManager则负责内部资源管理,维护全部Slot信息和状态)

4)创建并启动dispatcher(负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager)及相关服务(包括REST endpoint等)并加载JobGraph。

二、JM资源分配

JobManager开始执行ExecutionGraph,向ResourceManager申请资源。

ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN RM申请新的Container资源来启动TaskManager进程。

后续流程如果有空闲Slot资源,SlotManager将其分配给等待请求队列中匹配的请求,不用再通过YarnResourceManager申请新的TaskManager。


Flink ResourceManager接收到新分配的Container资源后,准备好TaskManager启动上下文(ContainerLauncherContext,生成TaskManager配置并上传至分布式存储,配置其他依赖和环境变量等)。

然后向YARN NM申请启动TaskManager进程,YARN NM启动Container的流程与AM Container启动流程基本类似。

三、TM启动过程

输出各软件版本及运行环境信息、命令行参数项、classpath等信息

注册处理各种SIGNAL的handler:记录到日志

注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞

加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry等)

启动tm后就可以通过RPC接收远程调用,submitTask就是接收任务的服务。

回到在JM端启动scheduler后,就开始调度Execution,在Execution的deploy()方法中通过rpc调用TM的submitTask接口。

交互流程图如下:

当submitTask收到请求后加载jobInformation和taskInformation文件,初始化jobInformation和taskInformation,然后构造Task,启动Task线程,最终调用AbstractInvokable.invoke方法。

  • invokable.invoke( )将根据nameOfInvokableClass的不同调度不同的任务,包括批任务、Source任务、Sink任务、流任务
  • DataSourceTask:Kafka Source
  • StreamTask:中间算子
  • DataSinkTask:Kafka Sink

这里以StreamTask例分析

  • 初始化、run、close
  • 初始化:创建状态后端、operator配置、特殊task初始化、恢复算子的状态、richfunction open
  • run:执行task,处理record并发往下游
  • close:关闭和清理操作

这里以flinkX中的代码为例:

会被invoke()中的initialize-operator-states()执行并调用到DtInputFormatSourceFunction的initializeState方法恢复状态。

这里以flinkX中的代码为例:

会被invoke()中的open-operators()执行并调用到DtInputFormatSourceFunction的open方法恢复状态做一些初始化工作。

这里以flinkX中的代码为例:

会被invoke()中的run()执行并调用到DtInputFormatSourceFunction的run读取数据并往下游发送。

经过上面分析,任务已经启动,并等待数据流动。

相关参考:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdfhttps://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf

https://zhuanlan.zhihu.com/p/87132673https://zhuanlan.zhihu.com/p/87132673


数栈是云原生—站式数据中台PaaS,我们在github和gitee上有一个有趣的开源项目:FlinkXFlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,也可以采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。大家喜欢的话请给我们点个star!star!star!

github开源项目:https://github.com/DTStack/flinkx

gitee开源项目:https://gitee.com/dtstack_dev_0/flinkx

有关数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配的更多相关文章

  1. Ruby Koans about_array_assignment - 非平行与平行分配歧视 - 2

    通过ruby​​koans.com,我在about_array_assignment.rb中遇到了这两段代码你怎么知道第一个是非并行赋值,第二个是一个变量的并行赋值?在我看来,除了命名差异之外,代码几乎完全相同。4deftest_non_parallel_assignment5names=["John","Smith"]6assert_equal["John","Smith"],names7end45deftest_parallel_assignment_with_one_variable46first_name,=["John","Smith"]47assert_equal'John

  2. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  3. ruby - 在 Ruby 中重新分配常量时抛出异常? - 2

    我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案

  4. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  5. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  6. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  7. ruby - 使对象的行为类似于 ruby​​ 中并行分配的数组 - 2

    假设您在Ruby中执行此操作:ar=[1,2]x,y=ar然后,x==1和y==2。是否有一种方法可以在我自己的类中定义,从而产生相同的效果?例如rb=AllYourCode.newx,y=rb到目前为止,对于这样的赋值,我所能做的就是使x==rb和y=nil。Python有这样一个特性:>>>classFoo:...def__iter__(self):...returniter([1,2])...>>>x,y=Foo()>>>x1>>>y2 最佳答案 是的。定义#to_ary。这将使您的对象被视为要分配的数组。irb>o=Obje

  8. ruby-on-rails - 使用 Dragonfly 从 URL 分配图像 - 2

    我正在使用Dragonfly在Rails3.1应用程序上处理图像。我正在努力通过url将图像分配给模型。我有一个很好的表格:{:multipart=>true}do|f|%>RemovePicture?Dragonfly的文档指出:Dragonfly提供了一个直接从url分配的访问器:@album.cover_image_url='http://some.url/file.jpg'但是当我在控制台中尝试时:=>#ruby-1.9.2-p290>picture.image_url="http://i.imgur.com/QQiMz.jpg"=>"http://i.imgur.com/QQ

  9. ruby - 了解在 Ruby 中与 lambda 一起使用的 inject 行为 - 2

    我经常将预配置的lambda插入可枚举的方法中,例如“map”、“select”等。但是“注入(inject)”的行为似乎有所不同。例如与mult4=lambda{|item|item*4}然后(5..10).map&mult4给我[20,24,28,32,36,40]但是,如果我制作一个2参数lambda用于像这样的注入(inject),multL=lambda{|product,n|product*n}我想说(5..10).inject(2)&multL因为“inject”有一个可选的单个初始值参数,但这给了我......irb(main):027:0>(5..10).inject

  10. ruby-on-rails - 如何测试自己对 Ruby/ROR 的了解? - 2

    是否有self验证的问题列表。看着那个,我可以确定我知道。我应该复习一下。在学习的过程中,我列了一个这样的list,但它只包含我在某处听说过的项目。我需要一段时间才能找到新的东西。 最佳答案 以下是针对ruby​​和Rails的一些测试列表。证书名称:RubyonRails谁提供:oDeskIncorporation认证费用:免费网站:https://www.odesk.com/tests/985?pos=0证书名称:RubyonRails提供者:Techgig.com(TimesBusinessSolutionsLimited(T

随机推荐