qos 2로 발행하는 게시자는 브로커 또는 구독자로부터 승인을받습니다.

조심하세요

나는 qos에 대해 약간 혼란 스럽습니다. qos에 대해 읽었습니다. qos가 2로 설정된 경우 브로커 / 클라이언트가 4 단계 핸드 셰이크를 사용하여 정확히 한 번 메시지를 전달합니다.

따라서 qos 2는 메시지가 가입자 (클라이언트)가 수신하지 않고 브로커에 게시되었음을 확인합니다. 또는 구독자가 메시지를 받거나

승인을 위해 우리는 게시자가 "DATA"와 같은 주제로 메시지를 게시하고 "ACK"와 같은 주제를 구독하고 구독자는 주제에 대해 메시지가 수신되었다는 주제 "ACK"에 대한 승인을 게시해야하는 것과 같은 응용 프로그램을 설정해야합니다. "데이터"

데이터 게시를위한 Java 클래스와 게시자를 구독하기위한 다른 클래스를 만들었습니다.

다음 코드에서 qos 2에서 게시하려고 시도했으며 deliveryComplete 함수에서 qos 0으로 시도했을 때 getMessage ()를 시도 할 때 예외가 발생했습니다. getMessage ()가 예외를주지 않았습니다.

public class PublishMe implements MqttCallback{
    MqttClient myClient;
    MqttClient myClientPublish;
    MqttConnectOptions connOpt;
    MqttConnectOptions connOptPublish;
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883";

    static Boolean msgACK=false;    
    public static void main(String[] args) {
        PublishMe smc = new PublishMe();
        smc.runClient();
    }
    @Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!");
    }

        @Override
        public void messageArrived(String string, MqttMessage message) throws Exception {
                System.out.println("-------------------------------------------------");
        System.out.println("| Topic:" + string);
        System.out.println("| Message: " + new String(message.getPayload()));
        System.out.println("-------------------------------------------------");

        }
/**
     * 
     * deliveryComplete
     * This callback is invoked when a message published by this client
     * is successfully received by the broker.
     * 
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
       try{
        System.out.println("Message delivered successfully to topic : \"" + token.getMessage().toString() + "\".");
       }catch(Exception ex){
        System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- " );      
               }

       }

    public void runClient() {
        connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(false);
        connOpt.setKeepAliveInterval(0);

                connOptPublish= new MqttConnectOptions();
        connOptPublish.setCleanSession(false);
        connOptPublish.setKeepAliveInterval(0);

// Connect to Broker
        try {
            myClient = new MqttClient(BROKER_URL, "pahomqttpublish11");
            myClient.setCallback(this);
            myClient.connect(connOpt);

                        myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42");
            myClientPublish.setCallback(this);
            myClientPublish.connect(connOptPublish);

        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        System.out.println("Connected to " + BROKER_URL);

        String myTopic = "sample";
//                String myTopic = "receiveDATA2";
                MqttTopic topic = myClientPublish.getTopic(myTopic);

        // publish messages if publisher
        if (publisher) {

                    int i=1;
            while(true){
                                String pubMsg = "sample msg "+i;

                MqttMessage message = new MqttMessage(pubMsg.getBytes());
                                System.out.println(message);
                                message.setQos(2);
                                message.setRetained(false);

                                // Publish the message
                                MqttDeliveryToken token = null;
                                try {
                    // publish message to broker
                    token = topic.publish(message);
                    // Wait until the message has been delivered to the broker
                    token.waitForCompletion();
                                        msgACK=false;
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }           
        }
    }


}

이하는 구독자입니다

public class Mqttsample implements MqttCallback{
    MqttClient myClient;
    MqttClient myClientPublish;
    MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
    // the following two flags control whether this example is a publisher, a subscriber or both
    static final Boolean subscriber = true;
    static final Boolean publisher = true;
        public static void main(String[] args) {


        Mqttsample smc = new Mqttsample();
        smc.runClient();
    }
       @Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!");
        // code to reconnect to the broker would go here if desired
    }

        @Override
        public void messageArrived(String string, MqttMessage message) throws Exception {
        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload()));

        }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try{
            System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
        }
        catch(Exception ex ){
            System.out.println("delivery Error "+ex.getMessage());
        }
       }



    public void runClient() {
        connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(false);
        connOpt.setKeepAliveInterval(0);

                connOptPublish= new MqttConnectOptions();
        connOptPublish.setCleanSession(false);
        connOptPublish.setKeepAliveInterval(0);

// Connect to Broker
        try {
            myClient = new MqttClient(BROKER_URL, "pahomqttpublish");
            myClient.setCallback(this);
            myClient.connect(connOpt);

                        myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe");
            myClientPublish.setCallback(this);
            myClientPublish.connect(connOptPublish);

        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        System.out.println("Connected to " + BROKER_URL);


        // subscribe to topic if subscriber
        if (subscriber) {
            try {
                            //String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING;
                            String myTopicACK = "sample";
                           // MqttTopic topicACK = myClient.getTopic(myTopicACK);
                int subQoS = 2;

                myClient.subscribe(myTopicACK, subQoS);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
//                 


    }


}

구독자가 메시지를 받았는지 확인하려면 어떻게해야합니까? 게시자 코드에서 구현해야하는 것은 무엇입니까?

위 링크에서 http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html

QoS2, 정확히 한 번 : 메시지가 항상 정확히 한 번 전달됩니다. 메시지는 보낸 사람이받는 사람이 메시지를 게시했다는 확인을받을 때까지 보낸 사람에게 로컬로 저장되어야합니다. 메시지를 다시 보내야하는 경우를 대비하여 메시지가 저장됩니다. QoS2는 가장 안전하지만 가장 느린 전송 모드입니다.

Hardillb

확인했듯이 더 높은 QOS 수준은 클라이언트 (게시자 또는 구독자)와 브로커 간의 메시지 전달만을 설명하며 종단 간 게시자 간 메시지 전달을 설명하지 않습니다.

발행 / 구독 프로토콜로서 토픽에 얼마나 많은 구독자가있을 수 있는지 알 수있는 방법이 없기 때문에 이것은 매우 신중합니다. 0과 n 사이의 숫자가있을 수 있습니다. 또한 게시자와 구독자는 서로 다른 QOS 수준에서 주제와 상호 작용할 수 있습니다 (게시자는 QOS 2에서 게시 할 수 있고 구독자는 QOS 0에서 구독 할 수 있음). 메시지는 보관 된 메시지로 게시 될 수도 있으므로 마지막으로 보관 된 메시지가 항상 새로 구독하는 클라이언트로 전달됩니다.

QOS 계약을 충족하기 위해 클라이언트의 모든 스토리지는 사용중인 MQTT 라이브러리 (이 경우 Paho)에서 처리해야합니다.

deliveryComplete콜백 게시자가 브로커에게 메시지를 송신 완료 만 표시된다. 또한 문서token.getMessage()메시지가 전달되면 null을 반환하여 언급 한 예외를 설명 한다고 말합니다 (예외를 포함하지 않았으므로 여기에서 추측해야 함).

응용 프로그램 아키텍처에서 메시지에 대한 종단 간 승인이 실제로 필요한 경우 설명했던 것과 유사한 것을 구현해야합니다. 그러나 제대로 작동하는지 확인하려면 메시지의 페이로드에 메시지 ID를 포함해야하며 확인 메시지에는이를 포함해야하며 메시지를받은 사람을 알 수 있도록 응답하는 구독자를 식별하는 방법이 있어야합니다. 내가 이와 같은 것을 사용하는 유일한 이유는 메시지를 확인하는 데 시간이 필요한 경우입니다. 시간이 관련 요소가 아닌 경우 영구 세션사용하여 메시지가 게시시 연결이 끊어진 경우 다시 연결할 때 구독 클라이언트에 전달되는지 확인합니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

mosquitto mqtt 브로커는 구독자에게 20 개 이상의 게시 패킷을 보내지 않습니다.

분류에서Dev

Eclipse Paho Mqtt Client는 QoS-2에서 브로커가 전달하도록 ACK 될 때까지 게시 된 모든 메시지를 유지합니까?

분류에서Dev

두 번째 구독자에게 마지막으로 발행 한 값을 내보내는 결합 게시자를 구성하는 방법

분류에서Dev

SQL 개발자로부터 프로 시저를 실행하는 동안 오류가 발생했습니다.

분류에서Dev

실행중인 프로세스가 자발적 또는 비자발적으로 CPU를 포기하게하는 이유는 무엇입니까?

분류에서Dev

승인자는 Sitecore 워크 플로에서 승인 상태 항목을 볼 수 없습니다.

분류에서Dev

게시자는 브로커를 사용할 수있을 때까지 기다려야합니다.

분류에서Dev

초보자부터 라우터 또는 확장기로 커버리지를 확장 하시겠습니까?

분류에서Dev

Pub-Sub 패턴 및 메시지 브로커, 모든 구독자가 이벤트 작업을 완료했는지 확인하는 방법

분류에서Dev

커패시터 플러그인을 게시하려고 할 때 'rm'이 내부 또는 외부 명령으로 인식되지 않습니다.

분류에서Dev

Java-사용자 또는 관리자로 로그인 하시겠습니까? 계승

분류에서Dev

게시자에서 구독자로 데이터를 복사하지 않는 논리적 복제

분류에서Dev

Kafka 생산자가 인터넷을 통해 브로커에 연결할 수 없습니다. 브로커로 로컬 네트워크에서 잘 작동합니다. 인터넷에서 작동하는 텔넷 연결

분류에서Dev

효율적인 방법으로 큰 숫자의 계승을 찾는데 여전히 TLE를 받고 있다면 어떻게해야합니까?

분류에서Dev

구축하지 않을 받는다는와 자바 : "잘못된 인수 견적을 내장했다"프로그램 "cmd"를 실행할 수 없습니다

분류에서Dev

세션 거래 소비자 또는 생산자에서 처리하는 메시지 브로커 예외

분류에서Dev

Polarion ALM의 API는 때때로 요청을 승인하지 않습니다.

분류에서Dev

정규식 - 두 문자 사이 또는 특정 문자로 시작부터 문자 발생하는 확신 단 1 인스턴스를 만들려면

분류에서Dev

브로커를 재부팅하지 않고 클러스터 구성을 추가하고 실행할 수있는 방법이 있습니까?

분류에서Dev

브로커를 재부팅하지 않고 클러스터 구성을 추가하고 실행할 수있는 방법이 있습니까?

분류에서Dev

Java로 1 또는 2 자리 시간 문자열을 구문 분석하는 방법은 무엇입니까?

분류에서Dev

새 (3.11.6) 또는 사용자 지정 커널로 부팅 할 수 없습니다.

분류에서Dev

새 (3.11.6) 또는 사용자 지정 커널로 부팅 할 수 없습니다.

분류에서Dev

커서로 쿼리를 수행하는 동안 IllegalArgumentException이 발생했습니다.

분류에서Dev

자신을 참조하는 구독자로부터 "초기화 전에 '변수'에 액세스 할 수 없음"오류가 발생하는 이유는 무엇입니까?

분류에서Dev

실패시 감독자로부터 발신자에게 반송

분류에서Dev

상승 된 사용자 지정 작업이있는 상승 된 설치 프로그램은 실행 파일을 상승시키지 않습니다.

분류에서Dev

상승 된 사용자 지정 작업이있는 상승 된 설치 프로그램은 실행 파일을 상승시키지 않습니다.

분류에서Dev

Facebook에 게시 오류 : '(# 200) 사용자가이 작업을 수행 할 수있는 응용 프로그램을 승인하지 않았습니다.', 유형 : 'OAuthException', 코드 : 200

Related 관련 기사

  1. 1

    mosquitto mqtt 브로커는 구독자에게 20 개 이상의 게시 패킷을 보내지 않습니다.

  2. 2

    Eclipse Paho Mqtt Client는 QoS-2에서 브로커가 전달하도록 ACK 될 때까지 게시 된 모든 메시지를 유지합니까?

  3. 3

    두 번째 구독자에게 마지막으로 발행 한 값을 내보내는 결합 게시자를 구성하는 방법

  4. 4

    SQL 개발자로부터 프로 시저를 실행하는 동안 오류가 발생했습니다.

  5. 5

    실행중인 프로세스가 자발적 또는 비자발적으로 CPU를 포기하게하는 이유는 무엇입니까?

  6. 6

    승인자는 Sitecore 워크 플로에서 승인 상태 항목을 볼 수 없습니다.

  7. 7

    게시자는 브로커를 사용할 수있을 때까지 기다려야합니다.

  8. 8

    초보자부터 라우터 또는 확장기로 커버리지를 확장 하시겠습니까?

  9. 9

    Pub-Sub 패턴 및 메시지 브로커, 모든 구독자가 이벤트 작업을 완료했는지 확인하는 방법

  10. 10

    커패시터 플러그인을 게시하려고 할 때 'rm'이 내부 또는 외부 명령으로 인식되지 않습니다.

  11. 11

    Java-사용자 또는 관리자로 로그인 하시겠습니까? 계승

  12. 12

    게시자에서 구독자로 데이터를 복사하지 않는 논리적 복제

  13. 13

    Kafka 생산자가 인터넷을 통해 브로커에 연결할 수 없습니다. 브로커로 로컬 네트워크에서 잘 작동합니다. 인터넷에서 작동하는 텔넷 연결

  14. 14

    효율적인 방법으로 큰 숫자의 계승을 찾는데 여전히 TLE를 받고 있다면 어떻게해야합니까?

  15. 15

    구축하지 않을 받는다는와 자바 : "잘못된 인수 견적을 내장했다"프로그램 "cmd"를 실행할 수 없습니다

  16. 16

    세션 거래 소비자 또는 생산자에서 처리하는 메시지 브로커 예외

  17. 17

    Polarion ALM의 API는 때때로 요청을 승인하지 않습니다.

  18. 18

    정규식 - 두 문자 사이 또는 특정 문자로 시작부터 문자 발생하는 확신 단 1 인스턴스를 만들려면

  19. 19

    브로커를 재부팅하지 않고 클러스터 구성을 추가하고 실행할 수있는 방법이 있습니까?

  20. 20

    브로커를 재부팅하지 않고 클러스터 구성을 추가하고 실행할 수있는 방법이 있습니까?

  21. 21

    Java로 1 또는 2 자리 시간 문자열을 구문 분석하는 방법은 무엇입니까?

  22. 22

    새 (3.11.6) 또는 사용자 지정 커널로 부팅 할 수 없습니다.

  23. 23

    새 (3.11.6) 또는 사용자 지정 커널로 부팅 할 수 없습니다.

  24. 24

    커서로 쿼리를 수행하는 동안 IllegalArgumentException이 발생했습니다.

  25. 25

    자신을 참조하는 구독자로부터 "초기화 전에 '변수'에 액세스 할 수 없음"오류가 발생하는 이유는 무엇입니까?

  26. 26

    실패시 감독자로부터 발신자에게 반송

  27. 27

    상승 된 사용자 지정 작업이있는 상승 된 설치 프로그램은 실행 파일을 상승시키지 않습니다.

  28. 28

    상승 된 사용자 지정 작업이있는 상승 된 설치 프로그램은 실행 파일을 상승시키지 않습니다.

  29. 29

    Facebook에 게시 오류 : '(# 200) 사용자가이 작업을 수행 할 수있는 응용 프로그램을 승인하지 않았습니다.', 유형 : 'OAuthException', 코드 : 200

뜨겁다태그

보관