要件:次の手順に進む前に、すべてのメッセージが公開されていることを確認する必要があります。
そのため、メッセージackチャネルの後にアグリゲーターを追加しました
問題:IDの数が少ない場合(おそらく3000レコード未満)、ソリューションは期待どおりに機能しています。ただし、IDの数が多い場合、アグリゲーターは待機状態になります
公開されたメッセージの数は常に正しいです。そのため、データベースの更新の下のコードに、確認チャネルの後にカウンターを追加しました。カウントは、IDのリストが大きい場合はIDの数よりも少なくなります。
<!-- service activator query database table and return list of IDs of type: Message<List<Map<String, Object>>> -->
<int:splitter id= "accountsSplitter" input-channel="listOfAccountsChannel" output-channel="accountChannel" />
<int:channel id="accountChannel">
<int:dispatcher task-executor="splitterTaskExecutor"/>
</int:channel>
<int:chain id="publishMessageChain" input-channel="accountChannel">
<int:transformer ref="accountIdTransformer"/>
<int-amqp:outbound-channel-adapter
amqp-template="amqpTemplateCore"
confirm-ack-channel="messageAckChannel"
confirm-nack-channel="messageAckChannel"
return-channel="messageAckChannel"
confirm-correlation-expression="#root"
exchange-name="ABC"
routing-key="#{abcRoutingKey}">
</int-amqp:outbound-channel-adapter>
</int:chain>
<int:chain id="confirmMessageChain" input-channel="messageAckChannel" output-channel="successMessageChannel">
<int:header-enricher id="replyChannelHeaderEnricher">
<int:reply-channel expression="payload.headers['replyChannel']" />
</int:header-enricher>
<int:transformer id="payloadTransformer" expression="payload" />
</int:chain>
<int:aggregator id="messagesConfirmedAggregator" input-channel="successMessageChannel" output-channel="aggregateChannel"/>
<task:executor id="splitterTaskExecutor" pool-size="10-40" queue-capacity="1000" rejection-policy="CALLER_RUNS" />
公開されたメッセージの数は常に正しいです。
どうしてそう思うのですか?
confirm-nack-channel
「ネガティブパブリッシャーの確認」を見るために追加しましょう!
そしておそらくreturn-channel
、同様にその場合は、「送信されます返すメッセージ」方法を確認します。
更新
いくつかのローカルテストの後、私は原因の根本を見つけました。
40
(最大)として送信するために並行スレッドプールを使用します。デフォルトでCachingConnectionFactory
は25
。を使用します。キャッシュサイズを超えると、新しい揮発性物質Channel
が作成され、確認を5秒だけ待つ場合があります。
CachingConnectionFactory.setChannelCacheSize()
同時および確認の要件を満たすには、かなり大きな値に増やす必要があります。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加