您是否有任何指示如何确定何时发生订阅问题以便我可以重新连接?
我的服务使用 RabbitMQ.Client.MessagePatterns.Subscription 进行订阅。一段时间后,我的客户默默地停止接收消息。我怀疑是网络问题,因为我们的 VPN 连接不是最可靠的。
我已经通读了一段时间的文档,寻找一个 key 来查明此订阅何时可能因网络问题而中断,但运气不佳。我试过检查连接和 channel 是否仍然打开,但它似乎总是报告它仍然打开。
它处理的消息工作得很好,并被确认回队列,所以我认为这不是“确认”的问题。
我确定我一定只是遗漏了一些简单的东西,但我还没有找到它。
public void Run(string brokerUri, Action<byte[]> handler)
{
log.Debug("Connecting to broker: {0}".Fill(brokerUri));
ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
using (Subscription subscription = new Subscription(channel, queueName, false))
{
while (!Cancelled)
{
BasicDeliverEventArgs args;
if (!channel.IsOpen)
{
log.Error("The channel is no longer open, but we are still trying to process messages.");
throw new InvalidOperationException("Channel is closed.");
}
else if (!connection.IsOpen)
{
log.Error("The connection is no longer open, but we are still trying to process message.");
throw new InvalidOperationException("Connection is closed.");
}
bool gotMessage = subscription.Next(250, out args);
if (gotMessage)
{
log.Debug("Received message");
try
{
handler(args.Body);
}
catch (Exception e)
{
log.Debug("Exception caught while processing message. Will be bubbled up.", e);
throw;
}
log.Debug("Acknowledging message completion");
subscription.Ack(args);
}
}
}
}
}
}
更新:
我通过在虚拟机中运行服务器来模拟网络故障,当我断开连接足够长,所以也许这不是网络问题。现在我不知道它会是什么,但它在运行几个小时后就失败了。
最佳答案
编辑:由于我仍然对此表示赞同,我应该指出 .NET RabbitMQ 客户端现在内置了此功能:https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery
理想情况下,您应该能够使用它并避免手动实现重新连接逻辑。
我最近不得不实现几乎相同的事情。据我所知,关于 RabbitMQ 的大部分可用信息都假定您的网络非常可靠,或者您在与任何客户端发送或接收消息的同一台机器上运行 RabbitMQ 代理,从而允许 Rabbit 处理任何连接问题。
设置 Rabbit 客户端以防止连接丢失确实不难,但是您需要处理一些特殊情况。
您需要做的第一件事是打开心跳:
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
将“RequestedHeartbeat”设置为 30 将使客户端每 30 秒检查一次连接是否仍然有效。如果不启用此功能,消息订阅者将愉快地坐在那里等待另一条消息进入,而不知道其连接已变坏。
打开心跳也会让服务器检查连接是否仍然有效,这可能非常重要。如果在订阅者接收消息后但在确认消息之前连接变坏,则服务器会假设客户端花费了很长时间,并且消息会“卡在”死连接上,直到它关闭。启用心跳后,服务器将识别连接何时变坏并关闭它,将消息放回队列中,以便另一个订阅者可以处理它。没有心跳,我不得不手动进入并关闭 Rabbit 管理 UI 中的连接,以便将卡住的消息传递给订阅者。
其次,您需要处理 OperationInterruptedException。正如您所注意到的,这通常是 Rabbit 客户端在注意到连接已中断时抛出的异常。如果在连接中断时调用 IModel.QueueDeclare(),您将得到此异常。通过处理您的订阅、 channel 和连接并创建新的来处理此异常。
最后,您必须处理消费者在尝试使用来自已关闭连接的消息时所做的操作。不幸的是,在 Rabbit 客户端中从队列中消费消息的每种不同方式似乎都有不同的 react 。如果您在关闭的连接上调用 QueueingBasicConsumer.Queue.Dequeue,QueueingBasicConsumer 将抛出 EndOfStreamException。 EventingBasicConsumer 什么都不做,因为它只是在等待消息。根据我的尝试,您正在使用的 Subscription 类似乎从对 Subscription.Next 的调用中返回 true,但是 args 的值 为空。再一次,通过处理您的连接、 channel 和订阅并重新创建它们来处理此问题。
connection.IsOpen 的值将在连接失败且心跳打开时更新为 False,因此您可以根据需要进行检查。但是,由于心跳在单独的线程上运行,您仍然需要处理连接在检查时打开但在调用 subscription.Next() 之前关闭的情况。
要注意的最后一件事是 IConnection.Dispose()。如果您在连接关闭后调用 dispose,此调用将抛出 EndOfStreamException。这对我来说似乎是一个错误,我不喜欢不在 IDisposable 对象上调用 dispose,所以我调用它并吞下异常。
将所有内容放在一个快速而肮脏的示例中:
public bool Cancelled { get; set; }
IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;
public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
while (!Cancelled)
{
try
{
if(_subscription == null)
{
try
{
_connection = factory.CreateConnection();
}
catch(BrokerUnreachableException)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
continue;
}
_channel = _connection.CreateModel();
_channel.QueueDeclare(queueName, true, false, false, null);
_subscription = new Subscription(_channel, queueName, false);
}
BasicDeliverEventArgs args;
bool gotMessage = _subscription.Next(250, out args);
if (gotMessage)
{
if(args == null)
{
//This means the connection is closed.
DisposeAllConnectionObjects();
continue;
}
handler(args.Body);
_subscription.Ack(args);
}
}
catch(OperationInterruptedException ex)
{
DisposeAllConnectionObjects();
}
}
DisposeAllConnectionObjects();
}
private void DisposeAllConnectionObjects()
{
if(_subscription != null)
{
//IDisposable is implemented explicitly for some reason.
((IDisposable)_subscription).Dispose();
_subscription = null;
}
if(_channel != null)
{
_channel.Dispose();
_channel = null;
}
if(_connection != null)
{
try
{
_connection.Dispose();
}
catch(EndOfStreamException)
{
}
_connection = null;
}
}
关于c# - RabbitMQ C# 驱动程序停止接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12499174/
我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我想用ruby编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序
我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_
如何在ruby中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL