jjzjj

tcp - 在 logstash 中以事务方式发送事件

coder 2023-09-19 原文

我正在尝试使用 logstash 从 TCP 套接字接收事件,并将它们输出到 Kafka 主题。我当前的配置能够完美地做到这一点,但我希望能够以事务方式向 Kafka 发送事件。我的意思是,系统不应该将事件发送到 kafka,直到收到提交消息:

START TXN 123         --No message sent to Kafka
123 - Event1 Message  --No message sent to Kafka
123 - Event2 Message  --No message sent to Kafka
123 - Event3 Message  --No message sent to Kafka
COMMIT TXN 123           --Event1, Event2, Event3 messages sent to Kafka

是否有可能仅使用 logstash 来实现此目的,或者我是否应该在源和 logstash 之间引入任何其他事务协调器?这是我当前的配置:

input {
  tcp {
    port => 9000
  }
}

output {
  kafka { 
    bootstrap_servers => "localhost:9092"
    topic_id =>  "alpayk"
  }
}

我尝试使用 logstash 的聚合过滤器来达到这个目的,但我无法以有效的方式结束。

非常感谢您

最佳答案

为此,我最终决定使用 Apache Flume。我修改了它的 netcat 源,以便未提交的消息驻留在 flume 的堆中,一旦收到事务的提交消息,所有消息都会发送到 kafka sink。

我打算将消息存储位置从 flume heap 更改为外部缓存,这样我就可以在事务异常终止或回滚时使存储的消息过期。

下面是我的一段交易逻辑代码:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
    String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
    String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
    ArrayList<Event> events = cachedEvents.get(txnId);

    if (message.equals("COMMIT")) {

        System.out.println("@@@@@ COMMIT RECEIVED");

        if (events != null) {
            for (Event eventItem : events) {
                ChannelException ex = null;
                try {
                    source.getChannelProcessor().processEvent(eventItem);
                } catch (ChannelException chEx) {
                    ex = chEx;
                }

                if (ex == null) {
                    counterGroup.incrementAndGet("events.processed");
                } else {
                    counterGroup.incrementAndGet("events.failed");
                    logger.warn("Error processing event. Exception follows.", ex);
                }
            }

            cachedEvents.remove(txnId);
        }
    } else {
        System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
        if (events == null) {
            events = new ArrayList<Event>();
        }
        events.add(EventBuilder.withBody(message.getBytes()));
        cachedEvents.put(txnId, events);
    }
}

我将此代码添加到 Flume 的 netcat 源的 processEvents 方法中。我不想使用 Ruby 代码,这就是我决定切换到 Flume 的原因。然而,同样的事情也可以在 logstash 中完成。

谢谢

关于tcp - 在 logstash 中以事务方式发送事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53958341/

有关tcp - 在 logstash 中以事务方式发送事件的更多相关文章

  1. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 我可以使用 aws-sdk-ruby 在 AWS S3 上使用事务性文件删除/上传吗? - 2

    我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的

  4. ruby-on-rails - 正确的 Rails 2.1 做事方式 - 2

    question的一些答案关于redirect_to让我想到了其他一些问题。基本上,我正在使用Rails2.1编写博客应用程序。我一直在尝试自己完成大部分工作(因为我对Rails有所了解),但在需要时会引用Internet上的教程和引用资料。我设法让一个简单的博客正常运行,然后我尝试添加评论。靠我自己,我设法让它进入了可以从script/console添加评论的阶段,但我无法让表单正常工作。我遵循的其中一个教程建议在帖子Controller中创建一个“评论”操作,以添加评论。我的问题是:这是“标准”方式吗?我的另一个问题的答案之一似乎暗示应该有一个CommentsController参

  5. jquery - 我的 jquery AJAX POST 请求无需发送 Authenticity Token (Rails) - 2

    rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送

  6. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  7. ruby-on-rails - 事件管理员日期过滤器日期格式自定义 - 2

    是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s

  8. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  9. 【鸿蒙应用开发系列】- 获取系统设备信息以及版本API兼容调用方式 - 2

    在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList​()Obt

  10. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

随机推荐