How to implement MQTT server using Spring Integration?

Alexander Farber

When I run the Outbound Channel Adapter example for MQTT it throws an error:

Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.4.1.RELEASE)

2016-10-11 21:53:36.811  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound)
2016-10-11 21:53:36.816  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : No active profile set, falling back to default profiles: default
2016-10-11 21:53:36.960  INFO 2102 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:37.724  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:37.729  INFO 2102 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2016-10-11 21:53:37.933  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2016-10-11 21:53:37.947  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2016-10-11 21:53:38.143  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:38.148  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.177  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.592  INFO 2102 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService  'taskScheduler'
2016-10-11 21:53:39.064  INFO 2102 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-10-11 21:53:39.077  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147483648
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 1 subscriber(s).
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2016-10-11 21:53:39.079  INFO 2102 --- [           main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.079  INFO 2102 --- [           main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2016-10-11 21:53:39.093  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669)
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source)
    at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27)
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180)
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189)
    at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 19 more
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
    ... 2 more
2016-10-11 21:53:39.203  INFO 2102 --- [       Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:39.207  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2016-10-11 21:53:39.209  INFO 2102 --- [       Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped _org.springframework.integration.errorLogger
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147483648
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 0 subscriber(s).
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2016-10-11 21:53:39.212  INFO 2102 --- [       Thread-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
------------------------------------------------------------------------
BUILD FAILURE

The failing code in MqttJavaApplication.java file is copied below:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String data);
    }
}

and it seems to be an MQTT client anyway...

How to implement an MQTT server (broker) as a Bean for Java Spring Integration, where to start please?

Gary Russell

Spring Integration does not provide a broker; it provides clients for sending/receiving messages.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Java

how to download file from sftp server using spring integration without using spring boot

From Java

How to retry socket server creation with spring-integration?

From Java

How to implement sticky session in spring-integration tcp gateway?

From Java

Dynamic TCP Server with Spring Integration using Java DSL

From Java

How to implement message queue with Spring Integration and MongoDB?

From Java

Spring Integration(MQTT): Retrieving published message

From Java

Issue in reading a excel file from SFTP server using spring integration

From Java

Spring Integration: MQTT integration test basics

From Dev

How to implement this TCP stream reader in Spring Integration?

From Dev

How to I implement whatsapp type messenger using MQTT?

From Dev

spring-integration-mqtt With multiple Mqtt Servers for subscription

From Dev

Spring Integration-Load test the tcp server using Jmeter

From Dev

How to catch errors in spring-integration socket server?

From Dev

how to implement integration router in java

From Dev

How to implement distributed lock around poller in Spring Integration using ZooKeeper

From Dev

How to publish a table using JSON format to MQTT server in DolphinDB

From Dev

How to implement HTTP request/reply when the response comes from a rabbitMQ reply queue using Spring Integration DSL?

From Dev

Implement a consumer and producer system using Spring integration

From Dev

spring integration-mqtt type converter issue

From Dev

Spring integration MQTT publish & subscribe to multiple topics

From Dev

Which Spring Integration Channel should be used for MQTT

From Dev

ActiveMQ Spring Integration using MQTT topic

From Dev

Paho MQTT vs MQTT paho spring integration

From Dev

How to setup TLS Server to authenticate client in spring integration?

From Dev

How to download filename*.csv, if filename*.marker file exists on the FTP server using spring integration ftp support

From Dev

How to create an asynchronous singleton socket server with spring-integration?

From Dev

How to capture MQTT data locally in SSH remote server using Wireshark?

From Dev

How to implement @ForeignKey correctly using Spring Boot

From Dev

How to implement Spring Integration Flow to act as a TcpClient

Related Related

  1. 1

    how to download file from sftp server using spring integration without using spring boot

  2. 2

    How to retry socket server creation with spring-integration?

  3. 3

    How to implement sticky session in spring-integration tcp gateway?

  4. 4

    Dynamic TCP Server with Spring Integration using Java DSL

  5. 5

    How to implement message queue with Spring Integration and MongoDB?

  6. 6

    Spring Integration(MQTT): Retrieving published message

  7. 7

    Issue in reading a excel file from SFTP server using spring integration

  8. 8

    Spring Integration: MQTT integration test basics

  9. 9

    How to implement this TCP stream reader in Spring Integration?

  10. 10

    How to I implement whatsapp type messenger using MQTT?

  11. 11

    spring-integration-mqtt With multiple Mqtt Servers for subscription

  12. 12

    Spring Integration-Load test the tcp server using Jmeter

  13. 13

    How to catch errors in spring-integration socket server?

  14. 14

    how to implement integration router in java

  15. 15

    How to implement distributed lock around poller in Spring Integration using ZooKeeper

  16. 16

    How to publish a table using JSON format to MQTT server in DolphinDB

  17. 17

    How to implement HTTP request/reply when the response comes from a rabbitMQ reply queue using Spring Integration DSL?

  18. 18

    Implement a consumer and producer system using Spring integration

  19. 19

    spring integration-mqtt type converter issue

  20. 20

    Spring integration MQTT publish & subscribe to multiple topics

  21. 21

    Which Spring Integration Channel should be used for MQTT

  22. 22

    ActiveMQ Spring Integration using MQTT topic

  23. 23

    Paho MQTT vs MQTT paho spring integration

  24. 24

    How to setup TLS Server to authenticate client in spring integration?

  25. 25

    How to download filename*.csv, if filename*.marker file exists on the FTP server using spring integration ftp support

  26. 26

    How to create an asynchronous singleton socket server with spring-integration?

  27. 27

    How to capture MQTT data locally in SSH remote server using Wireshark?

  28. 28

    How to implement @ForeignKey correctly using Spring Boot

  29. 29

    How to implement Spring Integration Flow to act as a TcpClient

HotTag

Archive