对于我来说,我一直无法通过RabbitMQ与临时回复队列一起使用RPC。下面是从该测试派生的简单示例。我在输出窗口中看到一堆异常,并且dlq填满,但是消息从未被确认。
namespace ConsoleApplication4
{
class Program
{
public static IMessageService CreateMqServer(int retryCount = 1)
{
return new RabbitMqServer { RetryCount = retryCount };
}
static void Main(string[] args)
{
using (var mqServer = CreateMqServer())
{
mqServer.RegisterHandler<HelloIntro>(m =>
new HelloIntroResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) });
mqServer.Start();
}
Console.WriteLine("ConsoleAppplication4");
Console.ReadKey();
}
}
}
namespace ConsoleApplication5
{
class Program
{
public static IMessageService CreateMqServer(int retryCount = 1)
{
return new RabbitMqServer { RetryCount = retryCount };
}
static void Main(string[] args)
{
using (var mqServer = CreateMqServer())
{
using (var mqClient = mqServer.CreateMessageQueueClient())
{
var replyToMq = mqClient.GetTempQueueName();
mqClient.Publish(new Message<HelloIntro>(new HelloIntro { Name = "World" })
{
ReplyTo = replyToMq
});
IMessage<HelloIntroResponse> responseMsg = mqClient.Get<HelloIntroResponse>(replyToMq);
mqClient.Ack(responseMsg);
}
}
Console.WriteLine("ConsoleAppplication5");
Console.ReadKey();
}
}
}
第一个例外
RabbitMQ.Client.Exceptions.OperationInterruptedException occurred
_HResult=-2146233088
_message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause=
HResult=-2146233088
IsTransient=false
Message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause=
Source=RabbitMQ.Client
StackTrace:
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply()
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey)
at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueue(IModel channel, String queueName)
at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueueByName(IModel channel, String queueName)
at ServiceStack.RabbitMq.RabbitMqProducer.PublishMessage(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
InnerException:
其次是这个
System.Threading.ThreadInterruptedException occurred
_HResult=-2146233063
_message=Thread was interrupted from a waiting state.
HResult=-2146233063
IsTransient=true
Message=Thread was interrupted from a waiting state.
Source=mscorlib
StackTrace:
at System.Threading.Monitor.ObjWait(Boolean exitContext, Int32 millisecondsTimeout, Object obj)
at System.Threading.Monitor.Wait(Object obj, Int32 millisecondsTimeout, Boolean exitContext)
InnerException:
然后重复多次并挂起。这篇特别的文章似乎暗示他们可以使用ServerStack和RabbitMQ RPC取得某种成功,但是在我开始更改代码之前,我想知道我的代码不起作用的原因。
谢谢斯蒂芬
还有一个新的RabbitMqTest项目,展示了一个简单的工作客户端/服务器示例,该示例通过2个独立的控制台应用程序进行通信。
此更改从MyGet的v4.0.34 +版本开始可用。
该ServiceStack.RabbitMq
包RabbitMq.Client
的NuGet依赖也已升级到v3.4.0。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句