我正在使用akka流,但由于流量无法处理某些值,因此需要有条件地跳过图的一部分。具体来说,我有一个接受字符串并发出http请求的流,但是当字符串为空时,服务器无法处理这种情况。但是我只需要返回一个空字符串即可。有没有一种方法可以执行此操作而不必经历http请求,而知道它会失败?我基本上有这个:
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
我唯一能想到的就是在httpResponse流中捕获400错误并返回默认值。但是我希望能够避免因我知道事先将失败的请求而打入服务器的开销。
维克多·巴生的解决方案是简洁,优雅。我只是想演示使用图的替代方法。
您可以将字符串的源分成两个流,并为有效字符串过滤一个流,对于无效字符串过滤另一个流。然后合并结果(“越过流”)。
根据文档:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val source = Source(List("1", "2", "", "3", "4"))
val sink : Sink[String,_] = ???
val bcast = builder.add(Broadcast[String](2))
val merge = builder.add(Merge[String](2))
val validReq = Flow[String].filter(_.size > 0)
val invalidReq = Flow[String].filter(_.size == 0)
val httpRequest: Flow[String, HttpRequest, _] = ???
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
val httpResponse: Flow[HttpResponse, String, _] = ???
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
bcast ~> invalidReq ~> merge
ClosedShape
})
注意:此解决方案拆分了流,因此接收器可能以与基于输入所期望的顺序不同的顺序处理字符串值结果。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句