我有一个基于Webflux的微服务,具有一个简单的反应式存储库:
public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
}
现在,我想扩展此微服务以使用来自Kafka的事件消息。该消息/事件将被保存到数据库中。
对于Kafka侦听器,我使用了Spring Cloud Stream。我创建了一些简单的Consumer,并且效果很好-我能够使用消息并将其保存到数据库中。
@Bean
public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
return input ->
input.foreach((key, value) -> {
LOG.info("Received event, Key: {}, value: {}", key, value);
repository.save(initNotification(value)).subscribe();
});
}
但这是连接Spring Cloud Stream使用者和反应式存储库的正确方法吗?到subscribe()
最后我必须打来电话时,情况看起来并非如此。
我阅读了Spring Cloud Stream文档(针对3.0.0版本),他们说
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
并且在此演示视频中,他们提到他们使用项目反应器提供了反应式编程支持。所以我想有一种我不知道的方法。你能告诉我怎么做对吗?
如果一切听起来都太愚蠢,我深表歉意,但是我对Spring Cloud Stream和反应式编程非常陌生,还没有发现很多文章对此进行介绍。
只需将Flux用作消耗类型,如下所示:
@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
return input ->
input
.map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
.concatMap((value) -> repository.save(initNotification(value)))
.subscribe();
}
如果使用Function
空返回类型(Function<Flux<Message<Event>>, Mono<Void>>
)而不是Consumer,则框架可以自动订阅。随着Consumer
您必须手动订阅,因为该框架没有参考流。但是,Consumer
如果您不是订阅存储库,而是订阅整个流,则可以。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句