큐에 일부 메시지를 수신 한 브로커를로 persistence-enabled
설정 true
한 경우 메시지가 도착한 후 연결되는 새 STOMP 클라이언트에 메시지를 보내도록 구성 할 수있는 방법이 있습니까?
여기서 아이디어는 대기열 작업자가 중지되었을 수 있으며 실행되지 않은 기간 동안 누적 된 작업을 다시 시작하기를 원한다는 것입니다.
현재 내 STOMP 소비자가 버퍼링 된 큐 메시지에 연결할 때 처리되지 않습니다. "버퍼링"이란 소비자가 연결되지 않은 상태에서 생산자가 큐에 메시지를 기록했음을 의미합니다. 이 시나리오를 계속하면 소비자가 연결되면 메시지를 볼 수 있지만 새 메시지 만 볼 수 있습니다. 이전 메시지는 결코 소비자에게 전송되지 않습니다.
브로커 구성
<?xml version='1.0'?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 50 writes per millisecond
on the current journal configuration.
That translates as a sync write every 20000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>20000</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>4096</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>1020000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?stompEnableMessageId=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
</acceptors>
<connectors>
<connector name="global">tcp://172.17.0.1:61616</connector>
<connector name="s">tcp://172.17.0.1:61617</connector>
</connectors>
<cluster-user>cluster</cluster-user>
<cluster-password>REDACTED</cluster-password>
<cluster-connections>
<cluster-connection name="multi-region">
<connector-ref>global</connector-ref>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<static-connectors>
<connector-ref>s</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<addresses>
<address name="/queue/global.regional">
<multicast>
<queue name="/queue/global.regional">
<durable>true</durable>
</queue>
</multicast>
</address>
</addresses>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<!-- <dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay> -->
<!-- with -1 only the global-max-size is in use for limiting -->
<!-- <max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics> -->
</address-setting>
</address-settings>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
</core>
</configuration>
설명에 따르면 클라이언트가 일반적으로 "pub / sub"의미 체계를 사용하는 것처럼 들립니다. ActiveMQ Artemis 문서 및 구성에서는이를 "멀티 캐스트"의미 체계라고합니다. 발행 / 구독 시맨틱 소비자 (예 : 구독자) 를 사용하는 경우 대상에 연결 한 후 (예 : 구독을 만든 후 ) 메시지가 전송 됩니다.
JMS와 같은 것에서 이러한 의미는 클라이언트가 대기열을 사용하는지 토픽을 사용하는지에 따라 제어됩니다. 그러나 STOMP 사양은 특정 의미가없는 일반 "대상"만 정의합니다. "프로토콜 개요"섹션에서 다음과 같이 설명합니다.
STOMP 서버는 메시지를 보낼 수있는 대상 집합으로 모델링됩니다. STOMP 프로토콜은 대상을 불투명 한 문자열로 취급하며 해당 구문은 서버 구현에 따라 다릅니다. 또한 STOMP는 목적지의 전달 의미를 정의하지 않습니다. 대상의 전달 또는 "메시지 교환"의미는 서버마다, 심지어 대상마다 다를 수 있습니다. 이를 통해 서버는 STOMP로 지원할 수있는 의미 체계를 창의적으로 사용할 수 있습니다.
실제로 원하는 것은 멀티 캐스트 대신 애니 캐스트 의미론 인 것처럼 들립니다 . ActiveMQ Artemis 문서 의 STOMP 장에서는 동적으로 생성 된 주소 및 대기열에 대해 이러한 의미를 제어하는 여러 방법을 다룹니다. 그러나 정적으로 생성 된 주소와 대기열을 사용하고 있으므로 다음을 사용하는 것이 좋습니다.
<addresses>
<address name="/queue/global.regional">
<anycast>
<queue name="/queue/global.regional"/>
</anycast>
</address>
</addresses>
현재 구성은 매우 드문 정적으로 생성 된 멀티 캐스트 대기열을 정의합니다. STOMP 클라이언트가 사용하는 주소의 멀티 캐스트 큐는 해당 클라이언트의 구독을 나타냅니다. 일반적으로 STOMP 클라이언트의 지속 가능한 구독을 수동으로 제어해야하는 경우에만 정적으로 멀티 캐스트 대기열을 만들고 싶을 것입니다. 이에 대해서는 ActiveMQ Artemis 문서의 STOMP 장에있는 "Durable Subscriptions"섹션에서 자세히 설명 합니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다