我用请求和回复编写了一个简单的消息流。我必须使用两个独立的队列,所以我声明AmqpOutboundAdapter发送消息,并声明AmqpInboundAdapter接收回复。
@Bean
@FindADUsers
public AmqpOutboundEndpoint newFindADUsersOutboundAdapter() {
return Amqp.outboundAdapter(amqpTemplate())
.routingKeyExpression("headers[" + ADUsersFindConfig.ROUTING_KEY_HEADER + "]")
.exchangeName(getExchange())
.headerMapper(amqpHeaderMapper())
.get();
}
@Bean
public AmqpInboundChannelAdapter newFindADUsersResponseInboundChannelAdapter(
ADUsersFindResponseConfig config) {
return Amqp.inboundAdapter(rabbitConnectionFactory(), findADUsersResponseQueue)
.headerMapper(amqpHeaderMapper())
.outputChannel(config.newADUsersFindResponseOutputChannel())
.get();
}
它应该与@MessagingGateway一起使用:
@MessagingGateway
public interface ADUsersFindService {
String FIND_AD_USERS_CHANNEL = "adUsersFindChannel";
String FIND_AD_USERS_REPLY_OUTPUT_CHANNEL = "adUsersFindReplyOutputChannel";
String FIND_AD_USERS_REPLY_CHANNEL = "adUsersFindReplyChannel";
String CORRELATION_ID_REQUEST_HEADER = "correlation_id";
String ROUTING_KEY_HEADER = "replyRoutingKey";
String OBJECT_TYPE_HEADER = "object.type";
@Gateway(requestChannel = FIND_AD_USERS_CHANNEL, replyChannel = FIND_AD_USERS_REPLY_CHANNEL)
ADResponse find(ADRequest adRequest, @Header(ROUTING_KEY_HEADER) String routingKey, @Header(OBJECT_TYPE_HEADER) String objectType);
}
ADUsersFindResponseConfig类如下所示:
@Configuration
@Import(JsonConfig.class)
public class ADUsersFindResponseConfig {
@Autowired
public NullChannel nullChannel;
@Autowired
private JsonObjectMapper<?, ?> mapper;
/**
* @return The output channel for the flow
*/
@Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_OUTPUT_CHANNEL)
public MessageChannel newADUsersFindResponseOutputChannel() {
return MessageChannels.direct().get();
}
/**
* @return The output channel for gateway
*/
@Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_CHANNEL)
public MessageChannel newADUsersFindResponseChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow findADUsersResponseFlow() {
return IntegrationFlows
.from(newADUsersFindResponseOutputChannel())
.transform(new JsonToObjectTransformer(ADResponse.class, mapper))
.channel(newADUsersFindResponseChannel())
.get();
}
}
发送消息工作正常,但是我在接收消息时遇到问题。我希望接收到的消息将传递到名为FIND_AD_USERS_REPLY_OUTPUT_CHANNEL的通道,然后使用findADUsersResponseFlow将消息反序列化为ADResponse对象,下一个ADResponse对象将传递给网关ReplyChannel-FIND_AD_USERS_REPLY_CHANNEL。最后,'find'方法返回该对象。不幸的是,当org.springframework.integration.handler.BridgeHandler收到一条消息时,出现异常:
org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
消息日志如下所示:
11:51:35.697 [SimpleAsyncTaskExecutor-1] INFO New message - GenericMessage [payload={...somepayload...}, headers={correlation_id=7cbd958e-4b09-4e4c-ba8e-5ba574f3309a, replyRoutingKey=findADUsersResponse.ad, amqp_consumerQueue=findADUsersResponseQueue, history=newFindADUsersResponseInboundChannelAdapter,adUsersFindReplyOutputChannel,adUsersFindReplyChannel,infoLog,infoLoggerChain.channel#0,infoLoggerChain.channel#1, id=37a4735d-6983-d1ad-e0a1-b37dc17e48ef, amqp_consumerTag=amq.ctag-8Qs5YEun1jXYRf85Hu1URA, object.type=USER, timestamp=1469094695697}]
因此,我很确定该消息已传递给adUsersFindReplyChannel。同样(如果重要的话),请求消息和回复消息都将'replyTo'标头设置为null。我究竟做错了什么?
该replyChannel
头部是一个活生生的对象,不能在AMQP序列化。
您可以使用出站网关而不是一对适配器,并且框架将处理标头。
如果由于某种原因必须使用适配器,则需要做两件事:
使用标头通道注册表将通道对象转换为在注册表中注册的字符串。
确保将标头映射器配置为发送/接收replyChannel
标头,并且您的接收系统在答复中返回标头。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句