下面的时序图清晰地说明了一个 Spark 应用程序从提交到运行的完整流程:
提交一个 Spark 应用程序,首先通过 Client 向 ResourceManager 请求启动一个Application,同时检查是否有足够的资源满足 Application 的需求,如果资源条件满足,则准备 ApplicationMaster 的启动上下文,交给 ResourceManager,并循环监控Application 状态。
当提交的资源队列中有资源时,ResourceManager 会在某个 NodeManager 上启动 ApplicationMaster 进程,ApplicationMaster 会单独启动 Driver 后台线程,当 Driver启动后,ApplicationMaster 会通过本地的 RPC 连接 Driver,并开始向 ResourceManager申请 Container 资源运行 Executor 进程(一个 Executor 对应与一个 Container),当ResourceManager 返回Container 资源,ApplicationMaster 则在对应的 Container 上启动 Executor。
Driver 线程主要是初始化 SparkContext 对象,准备运行所需的上下文,然后一方面保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。
当 ResourceManager 向 ApplicationMaster 返 回 Container 资源时,ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程,Executor 进程起来后,会向 Driver 反向注册,注册成功后保持与 Driver 的心跳,同时等待 Driver分发任务,当分发的任务执行完毕后,将任务状态上报给 Driver。
从上述时序图可知,Client 只负责提交 Application 并监控 Application 的状态。对于 Spark 的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是通过 ApplicationMaster、Driver 以及 Executor 之间来完成。
Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行。DAGScheduler 负责 Stage 级的调度,主要是将 DAG 切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler调度。TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中 SchedulerBackend 有多种实现,分别对接不同的资源管理系统。有了上述感性的认识后,下面这张图描述了 Spark-On-Yarn 模式下在任务调度期间,ApplicationMaster、Driver 以及 Executor 内部模块的交互过程:
Driver 初始化 SparkContext 过 程 中 , 会 分 别 初 始 化 DAGScheduler 、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend以及HeartbeatReceiver。SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor 的存活状况,并通知到 TaskScheduler。
Job 由 最 终 的 RDD 和 Action 方 法 封 装 而 成 , SparkContext 将 Job 交 给DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG 进行切分,将一个 Job划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之间被划分到同一个Stage 中,可以进行 pipeline 式的计算,如上图紫色流程部分。划分的 Stages 分两类,一类叫做 ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount。
Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据RDD 之间的依赖关系从 RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD-3 依赖 RDD-2,并且是宽依赖,所以在 RDD-2 和 RDD-3 之间划分Stage,RDD-3 被划到最后一个 Stage,即 ResultStage 中,RDD-2 依赖 RDD-1,RDD-1依赖 RDD-0,这些依赖都是窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,即 ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行RDD-0 到 RDD-2 的转化。不难看出,其本质上是一个深度优先搜索算法。
一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给TaskScheduler,一个 Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。
相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交 Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂,下面详细阐述其细节。
TaskSetManager 负责监控管理同一个 Stage 中的 Tasks,TaskScheduler 就是以TaskSetManager 为单元来调度任务。前面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler 有没有任务要运行,也就是说, 它会定期地 “ 问 ”TaskScheduler“ 我有这么余量,你 要不要啊 ” ,TaskScheduler 在 SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行,大致方法调用流程如下图所示:
图 3-7 中,将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend的 riviveOffers 方法给 driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor都是任务启动时反向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer对 象 ; 准 备 好 计 算 资 源 (WorkerOffer ) 后 , taskScheduler 基 于 这 些 资 源 调 用resourceOffer 在 Executor 上分配 task。
TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点,是Pool 类型。
1. FIFO 调度策略
如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入队,出队时直接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保存在一个 FIFO 队列中。
2. FAIR 调度策略
FAIR 调度策略的树结构如下图所示:
FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的 TaskSetMagager。
在 FAIR 模 式 中 , 需 要 先 对 子 Pool 进 行 排 序 , 再 对 子 Pool 里 面 的TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。
排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性:runningTasks 值(正在运行的 Task 数)、minShare 值、weight 值,比较时会综合考量 runningTasks 值,minShare 值以及 weight 值。
注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置。
1) 如果 A 对象的 runningTasks 大于它的 minShare,B 对象的 runningTasks 小于它的 minShare,那么 B 排在 A 前面;(runningTasks 比 minShare 小的先执行)
2) 如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比较runningTasks 与 minShare 的比值(minShare 使用率),谁小谁排前面;(minShare使用率低的先执行)
3) 如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比较runningTasks 与 weight 的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
4) 如果上述比较均相等,则比较名字。整体上来说就是通过 minShare 和 weight 这两个参数控制比较过程,可以做到让 minShare 使用率和权重使用率少(实际运行 task 比例较少)的先运行。
FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给 Executor 执行。
从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage的所有 Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager按照一定的规则一个个取出 Task 给 TaskScheduler , TaskScheduler 再交给SchedulerBackend 去发到 Executor 上执行。
在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个 task 以 X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X 本地性级别来启动该 task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有相应的资源去执行此 task,这就在在一定程度上提到了运行性能。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl
如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)
我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n
根据thispostbyStephenHagemann,我正在尝试为我的一个rake任务编写Rspec测试.lib/tasks/retry.rake:namespace:retrydotask:message,[:message_id]=>[:environment]do|t,args|TextMessage.new.resend!(args[:message_id])endendspec/tasks/retry_spec.rb:require'rails_helper'require'rake'describe'retrynamespaceraketask'dodescribe're
一、什么是MQTT协议MessageQueuingTelemetryTransport:消息队列遥测传输协议。是一种基于客户端-服务端的发布/订阅模式。与HTTP一样,基于TCP/IP协议之上的通讯协议,提供有序、无损、双向连接,由IBM(蓝色巨人)发布。原理:(1)MQTT协议身份和消息格式有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payload)两部分Topic,可以理解为消息的类型,订阅者订阅(Su
TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是
我正在使用jeweler为Rails3创建一个gem。该gem包含一个rake任务,它所做的其中一件事是删除数据库,所以我正在使用“database_cleaner”。我在gem的Gemfile中指定gem依赖项gem'database_cleaner'在Rakefile中Jeweler::Tasks.newdo|gem|...gem.add_dependency'database_cleaner'end然后在lib中我创建了文件my_gem.rb和tasks.rake。如下,my_gem.rb:moduleMyGemclassRailtie和tasks.rake:task:my_ta