/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
* loops when the old TaskTracker has gone bad (its state is
* stale somehow) and we need to reinitialize everything.
*/
public void run() {
try {
getUserLogManager().start();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
....}其中,用于处理心跳相关信息的服务函数offerService代码大体框架:/**
* Main service loop. Will stay in this loop forever.
*/
State offerService() throws Exception {
long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间
////此循环主要根据控制完成task个数控制心跳间隔。
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();//获得当前时间
// accelerate to account for multiple finished tasks up-front
//通过完成的任务数动态控制心跳间隔时间
long remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
while (remaining > 0) {
// sleeps for the wait time or
// until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
finishedCount.wait(remaining);
// Recompute
now = System.currentTimeMillis();
remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
if (remaining <= 0) {
// Reset count
finishedCount.set(0);
break;
}
}
}
...
//发送心跳
// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正想JobTracker发送心跳
.....
//开始处理JobTracker返回的命令
TaskTrackerAction[] actions = heartbeatResponse.getActions();
...
//杀死一定时间没没有汇报进度的task
markUnresponsiveTasks();
//当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间
killOverflowingTasks();从整个源码看,TaskTracker向JobTracker发送一次心跳的流程如下:
下面描述一下上面流程图中的几个重要过程:(1)过程一,判断是否达到心跳间隔。 TaskTracker的心跳间隔是由task完成情况以及整个集群规模规模动态觉得的。State offerService() throws Exception {
//上一次发心跳距现在时间
long lastHeartbeat = System.currentTimeMillis();
//此循环主要根据控制完成task个数控制心跳间隔。
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();//获得当前时间
// accelerate to account for multiple finished tasks up-front
//通过完成的任务数动态控制心跳间隔时间
long remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
while (remaining > 0) {
// sleeps for the wait time or
// until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
finishedCount.wait(remaining);
// Recompute
now = System.currentTimeMillis();
remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
if (remaining <= 0) {
// Reset count
finishedCount.set(0);//将已经完成的Task个数计数器归零
....上面代码的第九行,就是用来实现根据Task的运行完成或者失败数目来动态的缩短心跳间隔。其中finishedCount.get()用于获得获得已经运行完毕的Task的计数。再来看看这个计数是怎么incream的,定位到TaskTracker类的notifyTTAboutTaskCompletion方法:/**
* Notify the tasktracker to send an out-of-band heartbeat.
*/
private void notifyTTAboutTaskCompletion() {
if (oobHeartbeatOnTaskCompletion) {//判断是否启动“外带心跳”配置(默认为false)
synchronized (finishedCount) {
finishedCount.incrementAndGet();//运行完毕的Task计数器自增
finishedCount.notify();
}
}
}其中oobHeartbeatOnTaskCompletion可以由mapreduce.tasktracker.outofband.heartbeat配置(默认为false),也就是说要当启动“外带心跳”时,才会启动根据Task完成或者失败数来动态调整心跳间隔机制。下面看看动态调整心跳的具体算法,进入getHeartbeatInterval(finishedCount.get())方法:private long getHeartbeatInterval(int numFinishedTasks) {
return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
}其中,numFinishedTasks代码已经运行完成或者失败的Task数目,oobHeartbeatDamper简称“心跳收缩因子”由mapreduce.tasktracker.outofband.heartbeat.damper配置(默认为1000000)。当 启动外带心跳机制时,如果某个时刻有numFinishedTasks个任务运行完成,则心跳间隔就会调整为(heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1))。当不启动“外带心跳”机制时numFinishedTasks默认就为0了,那么整个心跳间隔还是heartbeatInterval。......
if(justInited) {//第一次启动justInited默认为true
String jobTrackerBV = jobClient.getBuildVersion();//获得JobTracker的版本号
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {//获得TaskTracker的版本号,并且判断JobTracker和TaskTracker的版本是否一致
String msg = "Shutting down. Incompatible buildVersion." +
"\nJobTracker's: " + jobTrackerBV +
"\nTaskTracker's: "+ VersionInfo.getBuildVersion();
....
justInited = false;//TaskTracker启动后将其设置为false
......justInited默认为true,当TaskTracker初次启动后会被改为false。当TaskTracker初次启动,进入检测TaskTracker和JobTracker版本一致性环节。跟进代码中看看是如何判断版本一致性的,/**
* Returns the buildVersion which includes version,
* revision, user and date.
*/
public static String getBuildVersion(){
return VersionInfo.getVersion() +
" from " + VersionInfo.getRevision() +
" by " + VersionInfo.getUser() +
" source checksum " + VersionInfo.getSrcChecksum();
}上面代码是获得JobTracker和TaskTracker版本号的返回格式字符串,getVersion()返回Hadoop版本号,getRevision()返回Hadoop的修订版本号,getUser()返回代码编译用户,getSrcChecksum()返回校验和。验证版本的一致性就是验证这些。(3)过程三,检测磁盘是否读写是否正常。 MapReduce框架中,在map任务计算过程中会将输出结果保存在mapred.local.dir指定的本地目录中(可以由多块磁盘组成,配置的时候用逗号隔开),这些本地目录是没有备份的(不像HDFS上有副本)一旦丢失或者损害整个Map任务需要重新进行计算。TaskTracker初始化时会对这些目录进行一次检测,并将正常的目录保存起来。之后,TaskTracker会周期性(由mapred.disk.healthChecker.interval配置,默认60s)地对这些正常目录进行检测,如果发现故障目录,TaskTracker就会重新对自己进行初始化。看看源代码,定位到TaskTracker的offerService方法:......
now = System.currentTimeMillis();
if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {//判断是否达到检测磁盘的时间间隔
localStorage.checkDirs();//检测硬盘读写是否正常
lastCheckDirsTime = now;
int numFailures = localStorage.numFailures();//出现读写错误的目录数
// Re-init the task tracker if there were any new failures
if (numFailures > lastNumFailures) {//检测本次检测中是否存在损害目录
lastNumFailures = numFailures;
return State.STALE;//硬盘读写检测错误,返回需要从新初始化状态
}
}
......其中,diskHealthCheckInterval代表检测磁盘的时间间隔,由mapred.disk.healthChecker.interval配置,默认60s。// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正向JobTracker发送心跳TaskTracker基本情况、资源使用情况、任务运行状态等信息会被封装到一个可序列化的类TaskTrackerStatus中,并会伴随心跳发送给JobTracker。每次发送心跳时,TaskTracker根据最新的信息重新构造TaskTrackerStatus。但是从源代码看并不是每次心跳都会发送节点资源信息申请新任务,看代码:// Check if we should ask for a new Task
//
boolean askForNewTask;
long localMinSpaceStart;
//存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才去向JobTracker发送节点资源使用情况申请新任务。
synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
askForNewTask = enoughFreeSpace(localMinSpaceStart);//判断map中间结果输出路径空间
long freeDiskSpace = getFreeSpace();//获得map中间结果输出大小
long totVmem = getTotalVirtualMemoryOnTT();//获得虚拟内存总量
long totPmem = getTotalPhysicalMemoryOnTT();//获得物理内存总量
long availableVmem = getAvailableVirtualMemoryOnTT();//获得可用虚拟内存量
long availablePmem = getAvailablePhysicalMemoryOnTT();//获得可用物理内存量
long cumuCpuTime = getCumulativeCpuTimeOnTT();//获得TaskTracker自从集群启动到现在的累计使用时间
long cpuFreq = getCpuFrequencyOnTT();//获得cpu频率
int numCpu = getNumProcessorsOnTT();//获得cpu核心数
float cpuUsage = getCpuUsageOnTT();//获得cpu使用率
//将这些资源信息封装到TaskTracker中的resStatus对象(ResourceStatus类实例)
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
status.getResourceStatus().setCpuFrequency(cpuFreq);
status.getResourceStatus().setNumProcessors(numCpu);
status.getResourceStatus().setCpuUsage(cpuUsage);
}只有当存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才会将上面的节点资源信息放到TaskTrackerStatus中,向JobTracker发送节点资源使用情况申请新任务。再来看看通过心跳传给JobTracker的TaskTrackerStatus封装的具信息:String trackerName;//taskTracker名称
String host;//主机名
int httpPort;//TaskTracker对外的http端口
int failures;//TaskTracker上运行失败的任务总数
List<TaskStatus> taskReports;//记录了当前TaskTracker上各个任务的运行状态
volatile long lastSeen;//上次汇报心跳的时间
private int maxMapTasks;//map slot总数
private int maxReduceTasks;//reduce slot总数
private TaskTrackerHealthStatus healthStatus;//记录TaskTracker的健康情况
...
private ResourceStatus resStatus;////TaskTracker资源信息,包括cpu、虚拟内存、物理内存等信息static class TaskTrackerHealthStatus implements Writable {
private boolean isNodeHealthy;//节点是否健康
private String healthReport;//如果节点不健康,则记录导致不健康的原因
private long lastReported;//上一次汇报健康状态的时间
...} healthStatus是由NodeHealthCheckerService线程计算得到,该线程允许管理员配置一个“监控监测脚本”以监测节点健康状况,且管理员可以在该脚本中添加任务监测语句作为节点是否健康运行的依据。如果脚本检测到该节点处于不健康状态,它需要再标准输出中打印一条以字符串“ERROR”开头的输出语句。NodeHealthCheckerService线程周期性调用健康检测脚本并检测其输出,一旦发现脚本输出是以“ERROR”开头的字符串,则认为该节点不健康,进而将其标注为“unhealthy”并通过心跳告诉JobTracker,而JobTracker得知节点状态为“unhealthy”后,会将其加入黑名单,此后不再为它分配新任务。需要注意的是,只要TaskTracker服务是活的,该线程就会一直运行该脚本,一旦发现节点又变为“healthy”,JobTracker会立刻将其从黑名单中移除,从而又会为之分配新任务。通过引入该机制,可以带来很多好处:#!/bin/bash
MEMORY_RATIO=0.1
freeMem=`grep MemFree /proc/meminfo |awk '{print $2}'`
totalMem=`grep MemTotal /proc/meminfo | awk '{print $2}'`
limitMem=`echo | awk '{print int("'$totalMem'"*"'$MEMORY_RATIO'")}'`
if [$freeMem -lt $limitMem];then
echo "ERROR,totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"
else
echo "Ok,totalMem=$totalMem, freeMem=$freeMem, limitMem=$limitMem"
fi一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
Transformers开始在视频识别领域的“猪突猛进”,各种改进和魔改层出不穷。由此作者将开启VideoTransformer系列的讲解,本篇主要介绍了FBAI团队的TimeSformer,这也是第一篇使用纯Transformer结构在视频识别上的文章。如果觉得有用,就请点赞、收藏、关注!paper:https://arxiv.org/abs/2102.05095code(offical):https://github.com/facebookresearch/TimeSformeraccept:ICML2021author:FacebookAI一、前言Transformers(VIT)在图
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模
我想开始使用“Sinatra”框架进行编码,但我找不到该框架的“MVC”模式。是“MVC-Sinatra”模式或框架吗? 最佳答案 您可能想查看Padrino这是一个围绕Sinatra构建的框架,可为您的项目提供更“类似Rails”的感觉,但没有那么多隐藏的魔法。这是使用Sinatra可以做什么的一个很好的例子。虽然如果您需要开始使用这很好,但我个人建议您将它用作学习工具,以对您来说最有意义的方式使用Sinatra构建您自己的应用程序。写一些测试/期望,写一些代码,通过测试-重复:)至于ORM,你还应该结帐Sequel其中(imho
目录0专栏介绍1平面2R机器人概述2运动学建模2.1正运动学模型2.2逆运动学模型2.3机器人运动学仿真3动力学建模3.1计算动能3.2势能计算与动力学方程3.3动力学仿真0专栏介绍?附C++/Python/Matlab全套代码?课程设计、毕业设计、创新竞赛必备!详细介绍全局规划(图搜索、采样法、智能算法等);局部规划(DWA、APF等);曲线优化(贝塞尔曲线、B样条曲线等)。?详情:图解自动驾驶中的运动规划(MotionPlanning),附几十种规划算法1平面2R机器人概述如图1所示为本文的研究本体——平面2R机器人。对参数进行如下定义:机器人广义坐标
网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.
1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>
参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍 介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。 内容有: ①:Hub模型的方法介绍 ②:服务器端代码介绍 ③:前端vue3安装并调用后端方法 ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke() 去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on
一、机器人介绍 此处是基于MATLABRVC工具箱,对ABB-IRB-1200型号的微型机械臂进行正逆向运动学分析,并利Simulink工具实现对机械臂进行具有动力学参数的末端轨迹规划仿真,最后根据机械模型设计Simulink-Adams联合仿真。 图1.ABBIRB 1200尺寸参数示意图ABBIRB 1200提供的两种型号广泛适用于各作业,且两者间零部件通用,两种型号的工作范围分别为700 mm 和 900 mm,大有效负载分别为 7 kg 和5 kg。 IRB 1200 能够在狭小空间内能发挥其工作范围与性能优势,具有全新的设计、小型化的体积、高效的性能、易于集成、便捷的接
目录一.大致如下常见问题:(1)找不到程序所依赖的Qt库version`Qt_5'notfound(requiredby(2)CouldnotLoadtheQtplatformplugin"xcb"in""eventhoughitwasfound(3)打包到在不同的linux系统下,或者打包到高版本的相同系统下,运行程序时,直接提示段错误即segmentationfault,或者Illegalinstruction(coredumped)非法指令(4)ldd应用程序或者库,查看运行所依赖的库时,直接报段错误二.问题逐个分析,得出解决方法:(1)找不到程序所依赖的Qt库version`Qt_5'