在Spring中使用哪种“ EventBus”?内置反应堆,Akka?

本杰明·M

我们将在几周内启动一个新的Spring 4应用程序。而且我们想使用一些事件驱动的架构。今年,我在各处阅读有关“反应堆”的信息,并在网上寻找它时,偶然发现了“ Akka”。

因此,目前我们有3种选择:

我找不到这些的真正比较。


现在,我们只需要以下内容:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

然后XY它将接收并处理该事件。

我们很可能会以异步方式使用它,但是可以肯定的是,还会有一些同步场景。而且我们很可能总是发送类作为事件。(Reactor示例大部分使用Strings和String模式,但它也支持Objects)。


据我了解,ApplicationEvent默认情况下同步工作,并且Reactor以异步方式工作。并且Reactor还允许使用该await()方法使它有点同步。Akka提供的功能与大致相同Reactor,但也支持Remoting。

关于Reactor的await()方法:它可以等待多个线程完成吗?或者甚至是这些线程的一部分?如果我们以上面的例子为例:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

可以这样说,使其同步:等待X Y完成。是否有可能让它仅等待X而不是等待Y


也许还有其他选择?例如JMS呢?

很多问题,但希望您能提供一些答案!

谢谢!


编辑:示例用例

  1. 当特定事件触发时,我想创建10000封电子邮件。每封电子邮件都必须包含用户特定的内容。因此,我会创建很多线程(max =系统cpu内核)来创建邮件,并且不会阻塞调用者线程,因为这可能需要几分钟。

  2. 当特定事件被触发时,我想从未知数量的服务中收集信息。每次提取大约需要100毫秒。在这里我可以想象使用Reactor await,因为我需要这些信息来继续在主线程中工作。

  3. 当特定事件被触发时,我想根据应用程序配置执行一些操作。因此,应用程序必须能够动态(取消)注册消费者/事件处理程序。他们会在活动中做自己的事情,我不在乎。因此,我将为每个处理程序创建一个线程,然后继续在主线程中继续执行我的工作。

  4. 简单的解耦:我基本上了解所有接收器,但是我只是不想调用代码中的每个接收器。这通常应该同步完成。

听起来我需要一个ThreadPool或RingBuffer。这些框架是否具有动态的RingBuffer,如果需要,它们会增大?

乔恩·布里斯宾

我不确定在这个狭小的空间中我能否充分回答您的问题。但我会试一试!:)

就功能而言,Spring的ApplicationEvent系统和Reactor确实非常不同。ApplicationEvent路由基于所处理的类型ApplicationListener除此之外,您还必须自己实现逻辑(但这不一定是一件坏事)。但是,Reactor提供了一个全面的路由层,该路由层也非常轻巧且完全可扩展。两端之间在订阅和发布事件的能力上在功能上有任何相似之处,这实际上是任何事件驱动系统的功能。另外,别忘了spring-messagingSpring 4附带的新模块。它是Spring Integration中可用工具的子集,还提供了围绕事件驱动架构进行构建的抽象方法。

Reactor将帮助您解决一些其他关键问题,否则您将不得不自行管理:

选择器匹配:Reactor进行Selector匹配,包括从简单.equals(Object other)调用到更复杂的URI模板匹配(允许占位符提取)的一系列匹配您还可以使用自己的自定义逻辑扩展内置选择器,以便可以将丰富对象用作通知键(例如,域对象)。

Stream和Promise API:您Promise已经提到了该.await()方法API ,这实际上是针对期望阻塞行为的现有代码。当使用Reactor编写新代码时,使用组合和回调通过不阻塞线程来有效利用系统资源的压力不会太大。在依赖少量线程来执行大量任务的体系结构中,阻塞调用者几乎从来不是一个好主意。期货根本无法实现云扩展,这就是现代应用程序利用替代解决方案的原因。

您的应用程序可以使用Streams或Promises来构建,尽管说实话,我认为您会发现它Stream更灵活。API的主要优点是可组合性,它使您可以将动作链接到依赖链中而不会阻塞。作为基于电子邮件用例的现成示例,您描述:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor还提供了Boundary,它基本上是一个CountDownLatch用于阻止任意使用者Boundary(因此,Promise如果您想要做的只是为Consumer完成而被阻止,则不必构造a )。Reactor在这种情况下,您可以使用原始格式,然后使用on()notify()方法来触发服务状态检查。

但是,对于某些事情,您似乎想要的是从Future返回的ExecutorService,不是吗?为什么不简单地简化事情呢?只有在吞吐量性能和开销效率很重要的情况下,Reactor才会真正受益。如果您阻塞了调用线程,那么您可能会抹去Reactor会给您带来的效率提升,因此在这种情况下,使用更传统的工具集可能会更好。

关于Reactor开放性的好处是,没有什么可以阻止两者进行交互。您可以自由组合FuturesConsumers不带静电。在这种情况下,请记住,您的速度只会与最慢的组件一样快。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使用flatMap()在反应堆语境?

来自分类Dev

使用反应堆工件在Tycho中运行Eclipse

来自分类Dev

我如何在反应堆内部使用收集类?

来自分类Dev

使用反应堆包装器时,不会调用扭曲的Python工厂方法

来自分类Dev

弹簧整合反应堆

来自分类Dev

反应堆测试失败

来自分类Dev

扭曲的ExtendSelected反应堆错误

来自分类Dev

流星JS $近反应堆

来自分类Dev

扭曲的Python暂停/推迟反应堆

来自分类Dev

从线程运行扭曲反应堆

来自分类Dev

反应堆lmax线程转储

来自分类Dev

反应堆取消订阅的方式

来自分类Dev

与反应堆抛出异常的正确方法

来自分类Dev

WebFlux(反应堆)中的文件处理

来自分类Dev

项目反应堆背压问题

来自分类Dev

流星JS $近反应堆

来自分类Dev

在iPython中运行扭曲反应堆

来自分类Dev

反应堆v1.xx与反应堆v2.xx

来自分类Dev

既然不赞成使用事务处理程序,那么我应该在Akka中使用哪种模式?

来自分类Dev

工程反应堆:设计一个反应API

来自分类Dev

如何知道反应堆是否在python中运行?

来自分类Dev

我如何从反应堆运行state.sls

来自分类Dev

替代扭曲的while循环,不会阻塞反应堆线程

来自分类Dev

R带条件面板和反应堆的Shiny模块

来自分类Dev

实现像数据反应堆这样的系统

来自分类Dev

误导了flatMapSequentialDelayError的项目反应堆文档图像,还是...?

来自分类Dev

从单在工程反应堆处理可选值

来自分类Dev

工程反应堆:doOnNext(或其他doOnXXX)异步

来自分类Dev

通量春5反应堆没有订阅