jjzjj

java - Kafka Consumer如何从多个assigned Partition中消费

coder 2024-03-20 原文

tl;dr; 我试图了解分配了多个分区的单个消费者如何处理到达分区的消费记录。

例如:

  • 在移动到下一个之前完全处理单个分区。
  • 每次从每个分区处理一大块可用记录。
  • 从第一个可用分区处理一批 N 条记录
  • 以循环方式处理来自分区的一批 N 条记录

我找到了 RangedRoundRobin 分配器的 partition.assignment.strategy 配置,但这只决定了消费者如何分配分区,而不是它如何分配从分配给它的分区中消耗。

我开始深入研究 KafkaConsumer 源代码并 #poll()带我去 #pollForFetches() #pollForFetches()然后带我到fetcher#fetchedRecords()fetcher#sendFetches()

这只是让我尝试跟进整个 Fetcher class一切都在一起,也许只是晚了,或者也许我只是没有深入挖掘,但我无法准确地弄清楚消费者将如何处理多个分配的分区。

背景

在 Kafka Streams 支持的数据管道上工作。

在这个管道的几个阶段,当记录被不同的 Kafka Streams 应用程序处理时,流被连接到由外部数据源提供的压缩主题,这些数据源提供所需的数据,这些数据将在继续处理的下一阶段之前在记录中扩充.

在此过程中,有几个死信主题,其中记录无法与本应扩充记录的外部数据源匹配。这可能是因为数据尚不可用( Activity 或营销 Activity 尚未上线),或者是错误数据,永远不会匹配。

我们的目标是在发布新的增强数据时重新发布死信主题中的记录,以便我们可以匹配死信主题中以前不匹配的记录,以便更新它们并将它们发送到下游以进行额外处理。

记录在多次尝试中可能无法匹配,并且在死信主题中可能有多个副本,因此我们只想重新处理现有记录(在应用程序启动时的最新偏移量之前)以及发送到自上次应用程序运行以来的死信主题(在先前保存的消费者组偏移量之后)。

它运作良好,因为我的消费者过滤掉应用程序启动后到达的任何记录,并且我的生产者通过将偏移量作为发布事务的一部分提交来管理我的消费者组偏移量。

但我想确保我最终会从所有分区中消费,因为我遇到了一个奇怪的边缘情况,在这种情况下,未匹配的记录被重新处理并落入与之前在死信主题中相同的分区中,只是被过滤掉消费者。虽然它没有获得新的记录批处理来处理,但仍有一些分区尚未重新处理。

如果能帮助理解单个消费者如何处理多个分配的分区,我们将不胜感激。

最佳答案

您查看 Fetcher 的方向是正确的,因为大部分逻辑都在那里。

首先作为 Consumer Javadoc提及:

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption.

您可以想象,在实践中,有几件事需要考虑。

  • 每次消费者尝试获取新记录时,它都会排除已经有记录等待(来自上一次获取)的分区。已经有正在进行的提取请求的分区也被排除在外。

  • 获取记录时,消费者在获取请求中指定fetch.max.bytesmax.partition.fetch.bytes。代理使用这些来分别确定总共和每个分区返回多少数据。这同样适用于所有分区。

使用这两种方法,默认情况下,消费者会尝试公平地消费所有分区。如果不是这种情况,更改 fetch.max.bytesmax.partition.fetch.bytes 通常会有所帮助。

以防万一,您希望某些分区优先于其他分区,您需要使用 pause()resume()手动控制消费流量。

关于java - Kafka Consumer如何从多个assigned Partition中消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54567011/

有关java - Kafka Consumer如何从多个assigned Partition中消费的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  6. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  7. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  8. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  9. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  10. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

随机推荐