如何在Webflux应用程序中使Spring Cloud Stream使用者成为消费者?

米罗斯拉夫

我有一个基于Webflux的微服务,具有一个简单的反应式存储库:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
    }

现在,我想扩展此微服务以使用来自Kafka的事件消息。该消息/事件将被保存到数据库中。

对于Kafka侦听器,我使用了Spring Cloud Stream。我创建了一些简单的Consumer,并且效果很好-我能够使用消息并将其保存到数据库中。

    @Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
        return input ->
                input.foreach((key, value) -> {
                    LOG.info("Received event, Key: {}, value: {}", key, value);
                    repository.save(initNotification(value)).subscribe();
                });
    }

但这是连接Spring Cloud Stream使用者和反应式存储库的正确方法吗?subscribe()最后我必须打来电话时,情况看起来并非如此

我阅读了Spring Cloud Stream文档(针对3.0.0版本),他们说

Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.

并且在此演示视频中,他们提到他们使用项目反应器提供了反应式编程支持。所以我想有一种我不知道的方法。你能告诉我怎么做对吗?

如果一切听起来都太愚蠢,我深表歉意,但是我对Spring Cloud Stream和反应式编程非常陌生,还没有发现很多文章对此进行介绍。

兹拉瓦尔

只需将Flux用作消耗类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();
}

如果使用Function空返回类型(Function<Flux<Message<Event>>, Mono<Void>>)而不是Consumer,则框架可以自动订阅。随着Consumer您必须手动订阅,因为该框架没有参考流。但是,Consumer如果您不是订阅存储库,而是订阅整个流,则可以。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spring Cloud Stream-Solace PubSub +-消费者并发

来自分类Dev

Spring Cloud Stream Kafka 消费者测试

来自分类Dev

当消费者在 spring-cloud-stream 应用程序中启动时接收消息

来自分类Dev

如何暂停消费者在Spring Cloud Kinesis流中使用消息

来自分类Dev

spring-cloud-stream-kafka离线消费者消息丢失

来自分类Dev

Spring cloud stream - 自动装配给定 PollableMessageSource 的底层消费者

来自分类Dev

在生产者/消费者模式中,如何杀死使用者线程?

来自分类Dev

Spring Cloud Stream Kinesis 消费者组可以由非 Spring 生产者发送吗?

来自分类Dev

如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

来自分类Dev

带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

来自分类Dev

如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

来自分类Dev

带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

来自分类Dev

Spring Cloud aws 流,消息被消费者组中的多个实例消费

来自分类Dev

Spring JMS消费者应用程序

来自分类Dev

如何在 IBM Cloud 应用程序中使用 Let's Encrypt 通配符证书?

来自分类Dev

如何在单个带有Spring Cloud Security Oauth2的Spring引导应用程序中使用多个Oauth2 SSO服务器?

来自分类Dev

如何在activeMQ中使用多个生产者和一个消费者?

来自分类Dev

在儿童中使用消费者的上下文

来自分类Dev

当应用程序没有生产者时,如何让 Spring Cloud Stream 创建 RabbitMQ 队列和绑定?

来自分类Dev

如何正常关闭Spring-Kafka消费者应用程序

来自分类Dev

在生产者/消费者场景中,如何从消费者那里得到回应?

来自分类Dev

如何配置Spring Cloud Stream(Kafka)应用程序以在Confluent Cloud中自动创建主题?

来自分类Dev

Spring Integration Kafka消费者

来自分类Dev

Spring Boot Kafka Listener与消费者

来自分类Dev

Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

来自分类Dev

如何扩展Kafka的消费者?

来自分类Dev

RabbitMQ如何节流消费者

来自分类Dev

如何管理消费者的凭证

来自分类Dev

如何找到kafka消费者的费率?

Related 相关文章

  1. 1

    Spring Cloud Stream-Solace PubSub +-消费者并发

  2. 2

    Spring Cloud Stream Kafka 消费者测试

  3. 3

    当消费者在 spring-cloud-stream 应用程序中启动时接收消息

  4. 4

    如何暂停消费者在Spring Cloud Kinesis流中使用消息

  5. 5

    spring-cloud-stream-kafka离线消费者消息丢失

  6. 6

    Spring cloud stream - 自动装配给定 PollableMessageSource 的底层消费者

  7. 7

    在生产者/消费者模式中,如何杀死使用者线程?

  8. 8

    Spring Cloud Stream Kinesis 消费者组可以由非 Spring 生产者发送吗?

  9. 9

    如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

  10. 10

    带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

  11. 11

    如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

  12. 12

    带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

  13. 13

    Spring Cloud aws 流,消息被消费者组中的多个实例消费

  14. 14

    Spring JMS消费者应用程序

  15. 15

    如何在 IBM Cloud 应用程序中使用 Let's Encrypt 通配符证书?

  16. 16

    如何在单个带有Spring Cloud Security Oauth2的Spring引导应用程序中使用多个Oauth2 SSO服务器?

  17. 17

    如何在activeMQ中使用多个生产者和一个消费者?

  18. 18

    在儿童中使用消费者的上下文

  19. 19

    当应用程序没有生产者时,如何让 Spring Cloud Stream 创建 RabbitMQ 队列和绑定?

  20. 20

    如何正常关闭Spring-Kafka消费者应用程序

  21. 21

    在生产者/消费者场景中,如何从消费者那里得到回应?

  22. 22

    如何配置Spring Cloud Stream(Kafka)应用程序以在Confluent Cloud中自动创建主题?

  23. 23

    Spring Integration Kafka消费者

  24. 24

    Spring Boot Kafka Listener与消费者

  25. 25

    Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

  26. 26

    如何扩展Kafka的消费者?

  27. 27

    RabbitMQ如何节流消费者

  28. 28

    如何管理消费者的凭证

  29. 29

    如何找到kafka消费者的费率?

热门标签

归档