一直以来,字节跳动都非常重视并贯彻“数据驱动”这一理念,作为数据驱动的一环,数据中台能力的建设至关重要,而这其中,数据集成作为数据中台建设的基础,主要解决了异构数据源的数据传输、加工和处理的问题。BitSail 源自字节跳动数据平台团队自研的数据集成引擎 DTS(全称 Data Transmission Service,即数据传输服务),最初基于 Apache Flink 实现,至今已经服务于字节内部业务接近五年,现已具备批式集成、流式集成和增量集成三类同步模式,并支持分布式水平扩展和流批一体架构,在各种数据量和各种场景下,一个框架即可解决数据集成需求。此外,BitSail 采用插件式架构,支持运行时解耦,从而具备极强的灵活性,企业可以很方便地接入新的数据源。
字节跳动数据集成引擎 BitSail 演进的历程可以分为三个阶段:① 初始期: 2018 年以前公司没有统一的数据集成框架,对每个通道都是各自实现,因此依赖的大数据引擎也比较零散,如 MapReduce、Spark,数据源之间的连接也是网状连接,整体的开发和运维成本都比较高。② 成长期:可以分为三个小阶段。
以下介绍一个批次场景上比较有意思的功能,也是实际业务中面临的一些痛点。
上图左上部分是原始的 Flink 运行日志,从这个日志里看不到任务进度数据和预测数据,如当前任务运行的百分比、运行完成所需时间。左下部分则是 Flink UI 界面提供的任务运行的元信息,可以看到读写条数都是 0,从 Flink 引擎角度,由于所有算子作为一个整体是没有输入和输出的,这是合理的,但从用户角度就无法看到任务整体进度信息和当前处理记录条数,从而导致用户怀疑这个任务是否已经卡住。图中右边是改造之后的效果,日志中明确输出当前处理了多少条数、实时进度展示、消耗时间等等,该功能在字节内部上线后,得到了很多业务的好评。
下面介绍一下具体的实现。首先回顾 Flink Task 的执行过程,与传统的 MapReduce、Spark 的驱动模型不一样,Flink 是以任务驱动,JM 创建好 Split 之后,Task 是常驻运行,不断向 JM 请求新的 Split,只有所有的 Split 处理完之后,Task 才会退出。此时,如果用总的完成的 Task 个数除以总的 Task 个数,进度将出现一定程度的失真。最开始,所有的 Task 都在运行,不断地去拉取 Split,我们看到的进度会是 0,等到 JM 的 Split 处理完之后,所有的 Task 会集中退出,可以看到进度会突然跳动到 100%,中间是缺少进度信息的。为了解决这个问题,我们还是要回到数据驱动本身,以 Split 的维度来衡量整个 Job 的运行过程。图中右边所展示的是,通过 Flink UI 提供的 API,可以拿到整个任务的拓扑信息,将其分为两层算子并进行改造,分别是 Source 层和 Operator 层。
左图(Shuffle)是目前社区的实现方式,很多数据湖的写入,比如 Hudi、Iceberg 基本上也是这个结构。这套结构分为两层算子,第一层是我们的数据处理层,负责数据的读取和写入;第二层算子是一个单节点的提交层,它是一个单并发,主要负责元信息的提交,比如去生成 Hive 的分区或者做一些其他的元信息动作。这个架构的优势是其整体拓扑(数据处理流程)比较清晰,算子功能定位也比较清楚,但是它有一个明显的缺陷,加入一个单并发节点后,导致整个任务变成 Shuffle 连接。而 Shuffle 连接天然的弱势是,当遇到 Task Failover 的时候,它会直接进行全局重启。右图(Pipelined)是改造之后的数据处理流程,数据写入部分没有变化,变化的是后面的提交部分,这样的设计考虑是是保持原有 Pipeline 架构,以实现 Task 容错时不会进行全局重启。废弃了原有的单并发提交节点,把所有元信息的提交拿到 JM 端处理,同时 Task 和 JM 的通讯是通过 Aggregate Manager 来实现。改为这套架构之后,在大数据量场景下,其稳定性得到了显著的提升。
右图是原有架构,处理流程包括三个模块:
右图是升级后的架构,主要的升级点包括:
数据湖是支持多种表格式的,比如 CopyOnWrite(简称 COW)表、MergeOnRead(简称 MOR)表。COW 表的优势在于读性能比较好,但是会导致写放大,MOR 表正好相反,写的性能比较好的,会导致读放大。具体选择哪种表格式,更多要根据大家的业务场景来决定。我们的业务场景是为了解决 CDC 数据的近实时同步,CDC 数据有个明显的特点,是存在大量的随机更新。这个场景下选择 COW,会导致写放大的问题比较严重,所以我们选择了 MOR 表。上图就是一个 MOR 表查询和写入的流程。第一个是列存储的基础镜像文件,我们称之为 Base 文件,第二个是行存储的增量日志,我们称之为 Log 文件。每次查询时,需要将 Log 文件和 Base 文件合并,为了解决 MOR 表读放大的问题,通常我们会建一个 Compaction 的服务,通过周期性的调度,将 Log 文件和 Base 文件合并,生成一个新的 Base 文件。
如图所示,这是原生的 Hudi 实时写入的流程图。首先,我们接入 Hudi 数据,会进入 Flink State,它的作用是索引。Hudi 提供了很多索引机制,比如 BloomIndex。但是 BloomIndex 有个缺陷,它会出现假阳性,降级去遍历整个文件,在效率上有一定的影响。Flink State 的优势是支持增量更新,同时它读取的性能会比较高。经过 Flink State 之后,我们就可以确认这条记录是 Upsert,还是 Insert 记录,同时会分配一个 File Id。紧接着,我们通过这个 File Id 会做一层 KeyBy,将相同 File 的数据分配到同一个 Task。Task 会为每一个 File Id 在本地做一次缓存,当缓存达到上限后,会将这批数据 Flush 出去到 hoodie client 端。Hoodie client 主要是负责以块的方式来写增量的 Log 数据,以 Mini Batch 的方式将数据刷新到 HDFS。再之后,我们会接一个单并发的提交节点,最新的版本是基于 Coordinator 来做的,当所有的算子 Checkpoint 完成之后,会提交元信息做一次 Commit,认为这次写入成功。同时 Checkpoint 时,我们会刷新 Task 的缓存和 hoodie client 的缓存,同时写到 HDFS。通常,我们还会接一个 Compaction 的算子,主要用来解决 MOR 表读放大的问题。这个架构在实际的生产环境会遇到如下问题:(1)当数据量比较大的时候,Flink State 的膨胀会比较厉害,相应地会影响 Task 的速度以及 Checkpoint 的成功率。(2)关于 Compaction 算子,Flink 的流式任务资源是常驻的,Compaction 本身是一个周期性的调度,如果并发度设置比较高,往往就意味着资源的浪费比较多。(3)Flink 提供了很多资源优化的策略,比如 Slot Sharing,来提高整体的资源利用率,这就会导致资源抢占的问题,Compaction 会和真正的数据读写算子来进行资源的抢占。Compaction 本身也是一个重 I/O、CPU 密集型操作,需要不断地读取增量日志、全量日志,同时再输出一个全量数据。针对上述问题,我们优化了 Hudi 的写入流程。
首先我们会采集 CDC 的 Change Log,并发送到消息队列,然后消费消息队列中的 Change Log,然后我们进行如下三个优化:(1)废弃了原先的 Flink State,替换为 Hash Index。Hash Index 的优势是不依赖外部存储。来了一个 HoodieRecord 之后,只需要一个简单的哈希处理,就知道它对应的 Bucket。(2)将 Compaction 服务独立成一个离线的任务,并且是周期性的调度,用来解决资源浪费和资源抢占的问题。(3)将 Task 缓存和 Hudi 缓存做了合并,因为每次 Checkpoint 都需要刷新 Task 缓存,Hudi 缓存需要写入 HDFS,如果缓存的数据量比较多,会导致整个 Checkpoint 时间比较长。优化之后,稳定性方面,可以支持百万级的 QPS;端到端的 Checkpoint 延时控制在 1 分钟以内,Checkpoint 成功率可以做到 99%。

框架对 Flink API 是深度绑定,用户需要深入到 Flink 引擎内部,这会导致整体 Connector 接入成本比较高。为了解决这个问题,我们抽象了新的读写接口,该接口与引擎无关,用户只要开发新的接口即可。同时在内部会做一层新的抽象接口与引擎接口的转换,这个转换对用户是屏蔽的,用户不需要了解底层引擎细节。
当前架构和 Flink 引擎深度绑定,在使用场景方面受到一定的限制,比如有些客户用了 Spark 引擎或者其他引擎。Flink 引擎依赖比较重的情况下,对于简单场景和小数据量场景,整体的资源浪费比较严重。为解决此问题,我们在引擎层预留了多引擎入口,在已经预留的 Flink 引擎基础之上,接下来会扩展到 Spark 引擎或者 Local Engine。具体实现方面,我们对执行的环境进行了一层抽象,不同的引擎会去实现我们的抽象类。同时,我们探索 Local 执行方式,对小数据量在本地通过线程的方式来解决,不用去启动 Flink Job 或类似的处理,提高整体资源的使用效率。
目前系统存在一些外部环境中没有的内部依赖,大数据底座也是绑定的公司内部版本,我们进行了三个方面的优化:我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我正在使用ruby1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
我想为我的Rails网络应用程序提供推荐功能。特别是,我想向新注册的用户推荐他可能想要关注的其他用户。Rails中是否有用于此目的的引擎/gem?如果没有,我应该从哪里开始构建它?谢谢。 最佳答案 有Coletivogemhttps://github.com/diogenes/coletivo我试了一下。在MySQL上运行。Neo4jhttp://neo4j.org真的很容易实现一个“跟随谁”。事实上,大多数展示其能力的样本都涉及“跟随谁”。快速提示-只有在JRuby上运行时,Neo4j.rb才会很酷。如果不是-使用Neograph
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
前言作为一名程序员,自己的本质工作就是做程序开发,那么程序开发的时候最直接的体现就是代码,检验一个程序员技术水平的一个核心环节就是开发时候的代码能力。众所周知,程序开发的水平提升是一个循序渐进的过程,每一位程序员都是从“菜鸟”变成“大神”的,所以程序员在程序开发过程中的代码能力也是根据平时开发中的业务实践来积累和提升的。提高代码能力核心要素程序员要想提高自身代码能力,尤其是新晋程序员的代码能力有很大的提升空间的时候,需要针对性的去提高自己的代码能力。提高代码能力其实有几个比较关键的点,只要把握住这些方面,就能很好的、快速的提高自己的一部分代码能力。1、多去阅读开源项目,如有机会可以亲自参与开源