jjzjj

c# - RabbitMQ C# 驱动程序停止接收消息

coder 2023-07-10 原文

您是否有任何指示如何确定何时发生订阅问题以便我可以重新连接?

我的服务使用 RabbitMQ.Client.MessagePatterns.Subscription 进行订阅。一段时间后,我的客户默默地停止接收消息。我怀疑是网络问题,因为我们的 VPN 连接不是最可靠的。

我已经通读了一段时间的文档,寻找一个 key 来查明此订阅何时可能因网络问题而中断,但运气不佳。我试过检查连接和 channel 是否仍然打开,但它似乎总是报告它仍然打开。

它处理的消息工作得很好,并被确认回队列,所以我认为这不是“确认”的问题。

我确定我一定只是遗漏了一些简单的东西,但我还没有找到它。

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

更新:

我通过在虚拟机中运行服务器来模拟网络故障,当我断开连接足够长,所以也许这不是网络问题。现在我不知道它会是什么,但它在运行几个小时后就失败了。

最佳答案

编辑:由于我仍然对此表示赞同,我应该指出 .NET RabbitMQ 客户端现在内置了此功能:https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

理想情况下,您应该能够使用它并避免手动实现重新连接逻辑。


我最近不得不实现几乎相同的事情。据我所知,关于 RabbitMQ 的大部分可用信息都假定您的网络非常可靠,或者您在与任何客户端发送或接收消息的同一台机器上运行 RabbitMQ 代理,从而允许 Rabbit 处理任何连接问题。

设置 Rabbit 客户端以防止连接丢失确实不难,但是您需要处理一些特殊情况。

您需要做的第一件事是打开心跳:

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

将“RequestedHeartbeat”设置为 30 将使客户端每 30 秒检查一次连接是否仍然有效。如果不启用此功能,消息订阅者将愉快地坐在那里等待另一条消息进入,而不知道其连接已变坏。

打开心跳也会让服务器检查连接是否仍然有效,这可能非常重要。如果在订阅者接收消息后但在确认消息之前连接变坏,则服务器会假设客户端花费了很长时间,并且消息会“卡在”死连接上,直到它关闭。启用心跳后,服务器将识别连接何时变坏并关闭它,将消息放回队列中,以便另一个订阅者可以处理它。没有心跳,我不得不手动进入并关闭 Rabbit 管理 UI 中的连接,以便将卡住的消息传递给订阅者。

其次,您需要处理 OperationInterruptedException。正如您所注意到的,这通常是 Rabbit 客户端在注意到连接已中断时抛出的异常。如果在连接中断时调用 IModel.QueueDeclare(),您将得到此异常。通过处理您的订阅、 channel 和连接并创建新的来处理此异常。

最后,您必须处理消费者在尝试使用来自已关闭连接的消息时所做的操作。不幸的是,在 Rabbit 客户端中从队列中消费消息的每种不同方式似乎都有不同的 react 。如果您在关闭的连接上调用 QueueingBasicConsumer.Queue.DequeueQueueingBasicConsumer 将抛出 EndOfStreamExceptionEventingBasicConsumer 什么都不做,因为它只是在等待消息。根据我的尝试,您正在使用的 Subscription 类似乎从对 Subscription.Next 的调用中返回 true,但是 args 的值 为空。再一次,通过处理您的连接、 channel 和订阅并重新创建它们来处理此问题。

connection.IsOpen 的值将在连接失败且心跳打开时更新为 False,因此您可以根据需要进行检查。但是,由于心跳在单独的线程上运行,您仍然需要处理连接在检查时打开但在调用 subscription.Next() 之前关闭的情况。

要注意的最后一件事是 IConnection.Dispose()。如果您在连接关闭后调用 dispose,此调用将抛出 EndOfStreamException。这对我来说似乎是一个错误,我不喜欢不在 IDisposable 对象上调用 dispose,所以我调用它并吞下异常。

将所有内容放在一个快速而肮脏的示例中:

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}

关于c# - RabbitMQ C# 驱动程序停止接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12499174/

有关c# - RabbitMQ C# 驱动程序停止接收消息的更多相关文章

  1. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  2. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  3. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  4. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  5. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  6. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  7. ruby-on-rails - 如何在我的 Rails 应用程序 View 中打印 ruby​​ 变量的内容? - 2

    我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby​​中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R

  8. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  9. ruby-on-rails - RSpec:避免使用允许接收的任何实例 - 2

    我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_

  10. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

随机推荐