jjzjj

VOP消息仓库演进之路 | 如何设计一个亿级企业消息平台

京东云开发者 2023-03-28 原文
作者:京东零售 李孟冬

VOP作为京东企业业务对外的API对接采购供应链解决方案平台,一直致力于从企业采购数字化领域出发,发挥京东数智化供应链能力,通过产业链上下游耦合与链接,有效助力企业客户的成本优化与资产效能提升。本文将介绍VOP如何通过亿级消息仓库系统来保障上千家企业KA客户与京东的数据交互。

引言

消息(仓库)作为电商业务场景必不可少的核心功能,自VOP上线以来,就开始了建设和演进迭代之路。截止目前,VOP消息仓库已接入200+内部消息端,对外提供80+消息,服务3000+企业客户,覆盖商品、地址、发票、订单、售后、物流等VOP所有业务场景。

消息系统中,一般有两种消费模式:服务端推送和客户端拉取。本文除了对于消息仓库的技术架构演进做对应叙述,重点介绍当前客户端拉取的消息仓库建设实践经验。

客户调用场景

以商品消息为例,京东企业业务目前大约有5600W+商品,这些商品涉及基本信息、价格、库存等的变更,客户侧会通过消息API主动获取商品变更消息,并通过查询实时商品信息接口来获取对应信息,同步本地商品库,业务处理完毕后,删除这一批商品类消息,定时循环。其他类消息同理,不多加描述。

消息仓库V1.0

和我们所了解的系统一样,随着业务发展和企业客户规模的增多,消息仓库整体架构和底层存储系统都逐渐出现瓶颈。特别在于数据库方面,毕竟在高并发读写的场景下很大一部分工作是围绕数据库展开的,所以前期两次的升级迭代主要需要解决的问题也是如何提升数据库容量。

虽然最初我们也通过读写分离等手段来有效降低数据库的负载,提升系统容量和稳定性,但是其缺点也是极其明显:主从延迟、从库数据量有限、TPS高 等问题无法妥善解决。

并且,随着618、1111等各种活动的开展,且VOP侧客户的不断增加,消息激增成为我们不得不尽快面对的问题,限流、缓存等手段随能保证系统的高可用及并发能力。但是消息大量积压、消费水平有限、消息同步不及时等问题越发严重,随之带来的就是对业务有损,所以我们在评估后,对系统进行升级,通过分析最掣肘我们的核心原因还是在于数据库。(此时消息表行数亿行,容量超过10G)

消息仓库V2.0

因此在读写分离无法不能满足我们的业务需要时(已经历过数据归档),分库分表的模式也就需要登上舞台了。具体如何分库分表,注意事项等我就不多加赘述了,感兴趣推荐翻阅菜鸟积分系统的分库分表实践​​​https://mp.weixin.qq.com/s/uFgSe59XP7RoXLTmm3u0KQ​

分库新旧流程对比

分库新旧流程比对切换依据(供参考)

  1. 根据 ducc和 clientId决定是否写入到新库,ducc(bizMsgTransDbJson)中配置了切换开关、白名单、黑名单和分流范围
  2. 使用新库写入时,根据 clientId.hashCode % dbSource.size,得出使用哪个 dbSource
  3. 客户读取时,先从旧库查出,若无数据,则再读取一遍新库,dbSource选取方式同上
  4. 客户删除时,判断删除 ID是否大于一万亿(1000000000000),大于取新库,小于取旧库
由于是多master的架构,分库分表除了包含读写分离模式的所有优点外,还可以解决读写分离架构中无法解决的 TPS 过高的问题,同时分库分表理论上是可以无限横向扩展的,也解决了读写分离架构下从库数量有限的问题。

当然在实际的工程实践中一般需要提前预估好容量,因为数据库是有状态的,如果发现容量不足再扩容是非常麻烦的,应该尽量避免。

在分库分表的模式下可以通过不启用查询从库的方式来避免主从延迟的问题,也就是说读写都在主库,因为在分库后,每个 master 上的流量只占总流量的 1/N,大部分情况下能扛住业务的流量,从库只作为 master 的备份,在主库宕机时执行主从切换顶替 master 提供服务使用。

优化后的前期情况是美好的,无论从客户角度还是从内部消费水平都得到了大幅提升,其跳点和峰值消息下高TPS影响CPU等问题都得到了解决,整个消息仓库性能和稳定性趋于稳定。

为什么说前期情况好,相信大家都有所预料了,虽然分库大幅提升了系统整体的吞吐能力和稳定性,但是由于前期的容量评估问题(业务增长加剧)及本身现有架构的局限性(单体应用),在仓库稳定运行一年左右,又出现了一些显而易见的痛点问题:

痛点问题

  1. 海量数据:19年客户量及商品品类(商品量级)的大幅增加,及最初分库时提升了消息数据的存储时长由2-3天提升至7天(原因:考量政府、银行等客户重保期间不消费消息的空档期,但是后期验证空档期长达月维度),消息仓库的流量出现了频繁翻倍的增长,数据不均衡的情况也逐渐显现出来;
  2. 字段扩展:随着业务不断的演进,消息内容也逐渐复杂(如售后消息 会附带各环节信息,整个JSON消息体较大),入库或存在字段长度限制,调整字段较难;
  3. 高可用&扩展性:原有单体架构的情况,会有热点数据的冲击及热点商品类消息数据对订单类、对账类消息数据的写入和同步带来严重的时延问题及服务性能跳点问题。
  4. 运维成本高:由于面向广大开发者,因此系统必须兼顾各种各样的网络环境问题,开发者能力问题等。企业对接客户常常来咨询消息量及消息消费情况,内部无对应的审计数据可供参考。

目标

不破不立,为避免消息问题长期以来的频繁影响及其他系统雷同的消息需求,我们急需打造一套可复用可扩展的企业消息中心,在满足业务的同时,还需综合考虑可用性、低成本、高吞吐和强扩展性,并且在迁移过程中保证消息不丢失和客户无感知。

方案分析

经过多方调研和排查之后,初步选取了2种存储方案:Mysql+es和MongoDB。

我们在存储成本、开发运维成本、性能对比三个方面进行评估Mysql+es和MongoDB的方案。(仅供参考,具体仍需根据自身业务评估)

  • 存储成本:MongoDB存储优势明显——数据压缩和无冗余存储,相比Mysql+es会减少50%以上的总数据容量。
  • 开发运维成本:MongoDB不需要数据同步,减少开发和运维难度;字段调整方面Mysql+es的架构下对于业务附带抖动风险,DDL相关问题风险高,易出错;MongoDB开发维护成本,存储架构简单,无数据一致性压力;扩容方面,MongoDB支持随时动态无脑扩容,基本不存在上限问题,但是Mysql的扩容需要保证hash一致,迁移数据灰度等情况,周期长且高概率存在对业务影响。
  • 性能对比:经过压测,同样的4C8G的机器配置下,MySQL和MongoDB在大数据量下写性能基本一致。MySQL的读性单分片约6000QPS左右,ES的性能只有800QPS左右。而 MongoDB 单分片地读性能在3万QPS左右,远高于MySQL和 ES 的性能。

消息仓库V3.0

没有完美的架构,只有刚好的架构,没有满足一切的架构,只有满足目标的架构

综上分析,MongoDB不仅完全满足业务需求,同时在其他方面也优于其他方案,因此最终选用MongoDB分片集群作为了最底层的数据存储方式,并对系统架构重新梳理,分为四个阶段:消息接收阶段,消息中转阶段,消息写入阶段 ,消息可视化阶段,主要职责如下:

  • 消息接收阶段(vop-worker):该系统仅关注不同消息源的接入,当前已接入中台近百个消息源,且依赖BTE任务平台、订单&商品池&主数据&消息中心等服务,通过过滤,清洗,封装等手段封装需入库的业务消息数据中转发出。
  • 消息中转阶段(JMQ集群):将消息中转出来,分级管控,当前分为四级,以此解决核心消息消费不及时,部分时段CPU内存飙升的问题。分级别设置消费线程数,低级别消息不影响高级别消息消费。低级别消息具备降级能力。
  • 消息写入阶段(vop-msg-store):消息写入阶段,批量双写,MongoDB+ES(支持多维度的运维审计查询及数据导出)。MongoDB解决tps10000+、数据量日均5亿+、多查询条件和数据分布不均匀的问题,解决数据库无法支撑租户数据均匀和消息内容可扩展的问题;创建mongo表,设置租户id和事件id索引、设置租户id的分片规则、设置唯一索引和超时时间45天。ES解决消息运维过程中,审计、核查等问题。
  • 消息可视化阶段(vop-support-platform):解决对客户生产/消费能力无认知、全局消息不可控和消息可视化的问题。并且数据可视化的不断完善又会反哺架构的可用性提升,为后续我们设立的优化专题打下坚实的数据基础。

补充:MongoDB分片集群无单点故障的原因——当 MongoDB 被部署为一个分片集群时,应用程序通过驱动,访问路由节点, 也就是 Mongos 节点 Mongos 节点会根据读写操作中的片键值,把读写操作分发的特定的分片执行,然后把分片的执行结果合并,返回给应用程序。那集群中的数据是如何分布的呢?这些元数据记录在 Config Server 中,这也是一个高可用的复制集。每个分片管理集群中整体数据的一部分,也是一个高可用复制集。此外,路由节点,也就是 Mongos 节点在生产环境通常部署多个。这样,整个分片集群没有任何单点故障。

消息仓库V3.0给我们带来的成果也是十分显著,高标准达到了预期的目标:

  • 支撑日均消息写入量5亿,现支持6wTPS和1wQPS
  • TP99从100ms提升至40ms,在高吞吐量情况下性能表现平稳
  • 新架构边界清晰,新需求不涉及核心系统的改造
  • 数据有效期7天提升至45天
  • It成本0增长
  • 消息可视化方面大幅提升运维效率,已全面开放技术客服使用

消息仓库V3.0+(回首往事)

之前我们一直铆足劲的往前追赶,现在系统稳定,为实现未来客户和商品的增量对消息仓库无影响&稳定运行3年+的目标,我们决定在限制资源有限性的情况下,转换角度思考问题和优化目标。随即我们针对消息数据开展了几个专题的治理,核心围绕流量治理、系统稳定性建设、降低成本三个方面出发。

锁定目标定后,剩下的只是迈步朝它慢慢走下去。

流量治理(峰值情况下裁剪亿级消息量)

1)优化业务场景,从源头减少调用量,梳理系统流程,优化无效数据源的接入,历史空跑逻辑等。
2)a、无效客户管控(LoadingCache),由于其他端外界客户接入VOP,存在部分不消费消息的无效客户,需进行主动屏蔽,以此解决无效客户消息中转存储的问题。b、缓存,减少耗时操作等等。
3)消息过滤器(jimdb),通过防重控制+时间窗口对客户未消费且重复sku进行去重,以此解决客户消息消费延迟,客户消息量大,重复消息多,客户系统重启后消息量巨大的问题,并大幅减少我侧MongoDB存储数据量。

这里补充一个小插曲,在流量治理过程中,我们也在数据中发现了一些问题,并作为指导我们产品优化的数据支撑,通过技术手段进行优化和处理。**如:通过数据分析,我们在整个消费过程中,部分客户(如:联通)消费较慢或者无效消费导致信息同步不及时的问题,因此从技术角度出发与客户技术侧沟通,通过建立自动补推功能,来提升客户与京东的同步率,即通过自助补推功能,来辅助客户同步异常情况下二次同步,以价格变更为例,通过客户下单价格不一致,来自助补推价格变更消息,以此挽回由于客户同步异常导致异常的订单,提升客户成单率, 进一步提升整体GMV产出。

这里也给我带来思考,无论引入还是自研,无论架构还是工具,落到实处,真实解决业务中的问题,在降本增效中带来价值,不论大小,均为创新。

系统稳定性(解决cpu毛刺及分片热点问题)

1)提高资源利用率:优化部分代码结构,如:通过list.contains()转化为set.contains()将其时间复杂度由O(n)降至O(1)、比较耗时或者不必放在主流程中执行的任务异步处理、单个写转化为批量写、减少传统重量级锁使用操作系统互斥量带来的性能损耗等等,以此解决大流量下,机器 cpu飙升影响整体性能的情况。

2)a、主动降级队列:前面有提到MongoDB设置租户id的分片规则,所以在单客户频繁进行大量商品池操作时,会发出该客户的大量商品出入池消息,由于当前整个系统吞吐性能极佳,所以在写入MongoDB时,会造成单分片的热点写问题,所以设定主动降级队列。具体实现为 在消息仓库多租户场景下,不影响整体客户的情况下,配置化(某客户+配置详消息类型)的进行异常客户的过载流量隔离,来保证底层存储介质的服务质量,即异常流量超过阈值则进入降级队列。 b、JMQ消费线程调优等

降低成本(非活动期间,白天消息量级相对晚上较少)

serverless自动扩缩:采用秒级消息接收量阈值和机器CPU阈值来触发自动扩缩策略,通过调优后非大促期间消息仓库整体资源成本下降52%。

小结

目前的消息仓库从正式服役到通过不断的迭代和更新已踏入V3.0+版本,成功经历了四次大促,系统各项性能指标稳定。以最近的大促为例,22年双十一开门红,消息相关接口性能稳定,MongoDB整体写入QPS 2w ,查询QPS 4.3w。 并且通过评估能完全应对接下来独立场切换带来的消息增长情况。

在消息仓库整体架构演进升级的过程中,虽然基础中间件给我们提供了各种高可用的能力,但可用性最终还是要回归我们业务架构本身。业务系统需要根据各平台业务特性尽可能选择最优的可用性方案,并在系统架构中遵循一些原则,如最大限度减少关键依赖;消除扩容瓶颈;预防和缓解流量峰值;过载时做好优雅降级等等。而且更重要的一点是,我们需要时刻思考架构如何支撑业务的长期增长。

后续有时间也可以给大家同步一下我们另一个数据推送平台。(一键三连催更)

展望

  1. 保持工匠精神,精益求精:在保证系统稳定性和扩展性的同时,以业务为重点,持续践行数据驱动的实践方法,进一步提升客户和VOP双方系统的各类消息同步率,通过技术手段不断优化产品,提升客户搜索体验及下单成功率。
  2. 消息数据治理:无论消息推送还是消息拉取方面都有一个极其明显的特征,在客户系统消费水平足够好的情况下,大部分数据是会在几秒内进行写删各一次,两次操作完成这条数据就失去了意义。(以前天为例,有3000W+消息数据生产消费几乎同速率)在这种场景,使用任何存储介质本身就不合理,就像是在存储介质中插入一条几乎不会去读的数据。这样生命周期极短的数据放在存储介质中,不仅资源浪费,也造成存储介质成为系统未来的瓶颈。 考虑服务器本身的成本问题,可以针对升级过滤器或者参考计算机三级存储体系结构的思路,未来将大量的此类消息事务在Memory内完成,其他消息按照原有方式进行操作,该方式下千万级消息事务在Memory内完成,节省大量服务器资源。
  3. 推送方式标准化:轮询状态下,数据的实时性终究依赖于客户应用的轮询间隔时间,该方式下,API调用效率低且浪费机器资源,如何结合业务侧推动数据推送标准化,给客户提供实时可靠的双向数据交换通道,大大提升API调用效率也是我们后续着重考虑的方向。
本次就写到这,零零散散,很多细节点(如:如何线程调优提升吞如,大流量消息下的数据埋点及分析等等)无法完全描绘,如有问题,欢迎交流。希望文章中的消息仓库的演进经验,给大家带来一些收获,或者说,大家不妨思考一下你们会采用何种技术方案和手段来解决演进中遇到的问题。

有关VOP消息仓库演进之路 | 如何设计一个亿级企业消息平台的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  4. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  5. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  6. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  7. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  8. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  9. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  10. ruby-on-rails - 渲染另一个 Controller 的 View - 2

    我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

随机推荐