jjzjj

java - 如何选择一个Kafka transaction.id

coder 2024-03-17 原文

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:

  • 我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。
  • 我不使用Kafka Streams API。
  • 我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
  • 有一个带有工作线程的线程池,该线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。
  • 我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布原子地进行

  • 到目前为止,我的假设包括:
  • 如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗消耗。因此,在重新启动时,我只需从原始的消耗偏移量再次启动事务即可。
  • 对于生产者transaction.id,重要的是它是唯一的。因此,我可以在启动
  • 时生成基于时间戳的id

    然后,我阅读了以下博客:https://www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择交易ID”部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例。它说:“正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写循环中的输入主题和分区始终相同。”它还进一步列举了以下问题示例:“例如,在分布式流处理应用程序中,假定主题分区tp0最初是由transactional.id T0处理的。如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会有隔离。因此,有可能重新处理来自tp0的消息,这完全违反了一次处理保证。”

    我不太明白为什么会这样。在我看来,只要事务是原子的,我就不在乎哪个生产者处理来自任何分区的消息。我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情。因此,为什么不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的。以及为什么他们说如果您这样做,则消息可能会通过事务提供的隔离机制泄漏。

    最佳答案

    您提到的博客文章包含了您正在寻找的所有信息,尽管内容非常密集。

    从为什么交易? aforementioned article中的部分。

    Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

    1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

    2. We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

    3. Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.” [emphasis added]



    same article的“事务语义”部分。

    Zombie fencing

    We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts. [emphasis added]

    The API requires that the first operation of a transactional producer should be to explicitly register its transactional.id with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the given transactional.id and completes them. It also increments an epoch associated with the transactional.id. The epoch is an internal piece of metadata stored for every transactional.id.

    Once the epoch is bumped, any producers with same transactional.id and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected. [emphasis added]



    并从same article中的“数据流”部分开始。

    A: the producer and transaction coordinator interaction

    When executing transactions, the producer makes requests to the transaction coordinator at the following points:

    1. The initTransactions API registers a transactional.id with the coordinator. At this point, the coordinator closes any pending transactions with that transactional.id and bumps the epoch to fence out zombies. This happens only once per producer session. [emphasis added]

    2. When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.

    3. When the application calls commitTransaction or abortTransaction, a request is sent to the coordinator to begin the two phase commit protocol.



    希望这可以帮助!

    关于java - 如何选择一个Kafka transaction.id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50335227/

    有关java - 如何选择一个Kafka transaction.id的更多相关文章

    1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

      我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

    2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

      总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

    3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

      关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

    4. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

      给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

    5. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

      我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

    6. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

      我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

    7. ruby - 如何指定 Rack 处理程序 - 2

      Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

    8. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

      使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

    9. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

      我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

    10. ruby-on-rails - 渲染另一个 Controller 的 View - 2

      我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

    随机推荐