Spring Cloud Stream-Solace PubSub +-消费者并发

丰兹

我正在将Spring Cloud Stream 3.0.6(Cloud:Hoxton.SR6,Boot 2.3.0.RELEASE)与Solace PubSub +结合使用。我无法让并发消费者工作。无论我配置什么,总会有一个线程依次执行每个传入消息。

这是我的StreamListener代码:

    @StreamListener(JobTriggerEventConsumerBinding.INPUT)
    protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message, 
                                     JobExecutionTriggerEvent event, 
                                     MessageHeaders headers) throws InterruptedException {
        
        log.info("Processing on thread: " + Thread.currentThread().getId());
      
        Thread.sleep(5000);
        
        log.info("Received the event!");
        log.info("-- Raw message:    {}", message);
        log.info("-- Headers:        {}", headers);
        log.info("-- Event:          {}", event);
        log.info("-- Event Contents: {}", event.getMessage());
    }

如果我将3条消息发送到输入通道(使用我编写的生产者应用程序),则会看到消息在同一线程(具有相同ID)上按顺序进行处理。我想实现的是消息是由3个线程同时处理的。

我的application.yml样子如下:

spring:
  cloud:
    stream:
      default:
        group: defaultConsumers
        consumer:
          concurrency: 3
      bindings:
        jobTriggers:
          group: jobTriggerConsumers 
          consumer:
            concurrency: 3
            max-attempts: 1
      solace: 
        bindings:
          jobTriggers:
            consumer:
              requeue-rejected: true

Mypom.xml包含以下依赖项:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-solace</artifactId>
      <version>2.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-cloud-connectors</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

难道这是Solace PubSub +粘合剂的问题?我在这里已经读到,的行为spring.cloud.stream.binders.<name>.consumer.concurrency可能取决于活页夹的实现。

这可能是什么问题?

参考文献:

# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'

services:
  primary:
    container_name: pubSubStandardSingleNode
    image: solace/solace-pubsub-standard:latest
    shm_size: 1g
    ulimits:
      core: 1
      nofile:
        soft: 2448
        hard: 38048
    ports:
    #Port Mappings:  Ports are mapped straight through from host to
    #container.  This may result in port collisions on commonly used
    #ports that will cause failure of the container to start.
      #Web transport
      - '80:80'
      #Web transport over TLS
      - '443:443'
      #SEMP over TLS
      - '943:943'
      #MQTT Default VPN
      #- '1883:1883'
      #AMQP Default VPN over TLS
      - '5671:5671'
      #AMQP Default VPN
      - '5672:5672'
      #MQTT Default VPN over WebSockets
      #- '8000:8000'
      #MQTT Default VPN over WebSockets / TLS
      #- '8443:8443'
      #MQTT Default VPN over TLS
      #- '8883:8883'
      #SEMP / PubSub+ Manager
      - '8080:8080'
      #REST Default VPN
      #- '9000:9000'
      #REST Default VPN over TLS
      #- '9443:9443'
      #SMF
      - '55555:55555'
      #SMF Compressed
      #- '55003:55003'
      #SMF over TLS
      - '55443:55443'
    environment:
      - username_admin_globalaccesslevel=admin
      - username_admin_password=admin
      - system_scaling_maxconnectioncount=100
丰兹

好吧,我会自己回答这个问题。

在RabbitMQ活页夹和正在运行的Rabbit实例中尝试上述配置时,并发工作得很好。因此,我认为一定是Solace粘合剂造成了问题。

经过一番谷歌搜索后,我确实找到了确认信息:https : //github.com/SolaceProducts/solace-spring-cloud/issues/7

显然,Solace PubSub +活页夹当前不支持并发,这实在太可惜了。至少看来问题正在得到解决。

这里还有一些社区讨论:https : //solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest

更新

此问题似乎已在2.1.1Spring Cloud Stream Solace活页夹版本得到修复即使用这种依赖

<dependency>
 <groupId>com.solace.spring.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-solace</artifactId>
 <version>2.1.1</version
</dependency>

如果您使用的是Spring Cloud Solace BOM,则1.1.1至少需要转到以下版本

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>solace-spring-cloud-bom</artifactId>
      <version>1.1.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
 </dependencies>
</dependencyManagement>

这适用于Spring Cloud Hoxton.SR6

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spring Cloud Stream Kafka 消费者测试

来自分类Dev

spring-cloud-stream-kafka离线消费者消息丢失

来自分类Dev

当消费者在 spring-cloud-stream 应用程序中启动时接收消息

来自分类Dev

Spring cloud stream - 自动装配给定 PollableMessageSource 的底层消费者

来自分类Dev

Spring Cloud Stream Kinesis 消费者组可以由非 Spring 生产者发送吗?

来自分类Dev

如何在Webflux应用程序中使Spring Cloud Stream使用者成为消费者?

来自分类Dev

带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

来自分类Dev

带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

来自分类Dev

Spring 集成上下文中的并发消费者

来自分类Dev

Spring Cloud aws 流,消息被消费者组中的多个实例消费

来自分类Dev

如何暂停消费者在Spring Cloud Kinesis流中使用消息

来自分类Dev

Spring Integration Kafka消费者

来自分类Dev

Spring Boot Kafka Listener与消费者

来自分类Dev

Spring Cloud Stream 3.0存在生产者问题

来自分类Dev

发布者通过Spring Cloud Stream确认

来自分类Dev

Kubernetes/Spring Cloud Dataflow 流 > spring.cloud.stream.bindings.output.destination 被生产者忽略

来自分类Dev

Spring Cloud Stream禁用重试

来自分类Dev

Spring Cloud Stream中的PollableChannel

来自分类Dev

Spring JMS生产者和消费者交互

来自分类Dev

Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

来自分类Dev

Kafka-spring消费者未阅读邮件

来自分类Dev

Spring Boot Kafka消费者滞后并读取错误

来自分类Dev

Spring Kafka-无法建立Kafka消费者

来自分类Dev

Spring JMS消费者应用程序

来自分类Dev

ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调

来自分类Dev

Spring JMS:配置多消费者客户端

来自分类Dev

Spring Boot Kafka消息消费者和丢失的消息

来自分类Dev

如何拦截Spring Cloud Stream消息?

来自分类Dev

Spring Cloud Stream Kafka记录太大

Related 相关文章

  1. 1

    Spring Cloud Stream Kafka 消费者测试

  2. 2

    spring-cloud-stream-kafka离线消费者消息丢失

  3. 3

    当消费者在 spring-cloud-stream 应用程序中启动时接收消息

  4. 4

    Spring cloud stream - 自动装配给定 PollableMessageSource 的底层消费者

  5. 5

    Spring Cloud Stream Kinesis 消费者组可以由非 Spring 生产者发送吗?

  6. 6

    如何在Webflux应用程序中使Spring Cloud Stream使用者成为消费者?

  7. 7

    带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

  8. 8

    带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

  9. 9

    Spring 集成上下文中的并发消费者

  10. 10

    Spring Cloud aws 流,消息被消费者组中的多个实例消费

  11. 11

    如何暂停消费者在Spring Cloud Kinesis流中使用消息

  12. 12

    Spring Integration Kafka消费者

  13. 13

    Spring Boot Kafka Listener与消费者

  14. 14

    Spring Cloud Stream 3.0存在生产者问题

  15. 15

    发布者通过Spring Cloud Stream确认

  16. 16

    Kubernetes/Spring Cloud Dataflow 流 > spring.cloud.stream.bindings.output.destination 被生产者忽略

  17. 17

    Spring Cloud Stream禁用重试

  18. 18

    Spring Cloud Stream中的PollableChannel

  19. 19

    Spring JMS生产者和消费者交互

  20. 20

    Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

  21. 21

    Kafka-spring消费者未阅读邮件

  22. 22

    Spring Boot Kafka消费者滞后并读取错误

  23. 23

    Spring Kafka-无法建立Kafka消费者

  24. 24

    Spring JMS消费者应用程序

  25. 25

    ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调

  26. 26

    Spring JMS:配置多消费者客户端

  27. 27

    Spring Boot Kafka消息消费者和丢失的消息

  28. 28

    如何拦截Spring Cloud Stream消息?

  29. 29

    Spring Cloud Stream Kafka记录太大

热门标签

归档