combine
akkaストリームの演算子には、次の署名があります。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
私は複数のソースを持っていますが、すべて同じMat
です。ホイスト保存でそれらを組み合わせる必要がありMat
ます。
したがって、次の署名を持つ関数が必要です。
def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]
既存のcombineMat
ものは2つの入力のみを受け入れます。無制限が必要です。
Akkaのcombineの実装は次のとおりです。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val c = b.add(strategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
if (i.hasNext) {
i.next() ~> c.in(idx)
combineRest(idx + 1, i)
} else SourceShape(c.out)
combineRest(2, rest.iterator)
})
sをSourceShape
サポートしていないものを使用しているMat
ので、ここではうまくいかないと思います。
一方、複数のストリームでは機能しないcombineMat
使用の実装viaMat
。
これは可能ですか?
次の作品:
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}
import scala.collection.immutable
object Combine {
def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
Source.fromGraph(GraphDSL.create(sources) {
implicit builder => {
sourceShapes => {
val target = builder.add(strategy(sources.size))
for ((source, index) <- sourceShapes.zipWithIndex) {
source ~> target.in(index)
}
SourceShape(target.out)
}
}
})
}
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加