我们有一个运行 RabbitMQ 3.8.5 和 RabbitMQ.Client 5.2.0 的系统。系统使用 Rabbit 的方式之一是创建一个消费者来等待来自不同执行器的特定命令...
我们有一个运行 RabbitMQ 3.8.5 和 RabbitMQ.Client 5.2.0 的系统。系统使用 Rabbit 的方式之一是,当客户端发出长轮询 Web 请求时,创建一个消费者来等待来自不同可执行文件的特定命令。Web 请求通过 .Net 6.0 Web API 传入,每个客户端都会创建消费者,因为所述命令发布在特定于该客户端的队列上。在给定的超时之后,消费者将被处置,客户端将重新启动长轮询请求。我们能够支持 1500 多个客户端/消费者同时发出长轮询请求。在我们升级到 RabbitMQ 3.13.3 和 RabbitMQ.Client 6.8.1 之前,这一直运行良好。现在,当大约 700 个客户端发起此长轮询请求时,会发生超时异常。当 500 个客户端长时间连接(> 1 小时)时,也会发生同样的情况。我们也尝试使用 EasyNetQ,因为我们在系统的其他部分使用该库,但遇到了相同的错误和行为,即超时,随后 RabbitMQ 服务器终止连接。
进一步的调查似乎表明,发生超时异常时会错过心跳,并且 RabbitMQ 服务器会终止连接,从而导致我们这边出现多米诺骨牌效应。我们尚未找出导致超时的原因,并阻止了与 RabbitMQ 相关的所有操作。
编辑 这里有一些可能有帮助的细节,版本升级之间任何设置都没有改变。
-
目标工作量为 1500+ 个客户,并保持现状
-
VM、Windows Server 2022 Standard x64、32 GB RAM、2.3 Ghz CPU
-
队列仍然是经典的,并且保持原样。
下面提供了一些详细信息:
这是错误首次发生时出现的初始异常:
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean
durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ConsumerCount(String queue)
EasyNetQ 错误
System.TimeoutException: The operation has timed out.
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass51_0.<QueueDeclareAsync>b__0(IModel x)
at EasyNetQ.Persistent.PersistentChannel.InvokeChannelActionAsync[TResult,TChannelAction](TChannelAction channelAction, CancellationToken cancellationToken)
at EasyNetQ.RabbitAdvancedBus.QueueDeclareAsync(String name, Action`1 configure, CancellationToken cancellationToken)
at EasyNetQ.AdvancedBusExtensions.QueueDeclare(IAdvancedBus bus, String name, Boolean durable, Boolean exclusive, Boolean autoDelete, CancellationToken cancellationToken)
初始消费者守则
ncChannel.QueueDeclare(queueName, true, false, false, null);
ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
ncChannel.QueueBind(queueName, ExchangeName, routingKey);
Channel<BasicDeliverEventArgs> responseChannel = Channel.CreateBounded<BasicDeliverEventArgs>(channelOptions);
EventHandler<BasicDeliverEventArgs> nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
try
{
if (msg != null && msg.Body.ToArray() != null)
{
string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msg);
responseChannel.Writer.TryComplete();
}
catch (Exception ex)
{
//log
}
};
consumer.Received += nextCommandHandler;
bool autoAck = true;
consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);
尝试将消费者更改为异步事件基本消费者
Channel<string> responseChannel = Channel.CreateBounded<string>(channelOptions);
ncChannel.QueueDeclare(queueName, true, false, false, null);
ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
ncChannel.QueueBind(queueName, ExchangeName, routingKey);
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(ncChannel);
AsyncEventHandler<BasicDeliverEventArgs> nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
try
{
if (msg != null && msg.Body.ToArray() != null)
{
string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msg);
responseChannel.Writer.TryComplete();
await Task.Yield();
}
catch (Exception ex)
{
//log
}
};
consumer.Received += nextCommandHandler;
bool autoAck = true;
consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);
EasyNetQ 消费者代码
var queue = _rabbitBus.Advanced.QueueDeclare(queueName, true, false, false);
consumer = _rabbitBus.Advanced.Consume(queue, (body, properties, info) => Task.Factory.StartNew(async () =>
{
try
{
string msgBody = null;
if (body.ToArray() != null)
{
msgBody = Encoding.UTF8.GetString(body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msgBody);
responseChannel.Writer.TryComplete();
await Task.Yield();
}
catch (Exception ex)
{
//log
}
}));