메시지를 지연시킬 수있는 구성 요소를 통해 메시지가 전달되는 상황이 있습니다.
스트레스를 받으면이 구성 요소를 건너 뛰고 동시에 X 개 이상의 메시지가 지연되지 않도록하고 싶습니다. 넘쳐나는 메시지는이 단계를 건너 뛰고 다음 단계로 넘어갑니다.
메시지는이 단계에서 미래가 완료 될 때까지 또는 최대 1 분 중 먼저 도래하는 시점까지 중단됩니다.
이 버퍼 예제 와 유사한 사용자 정의 GraphStage를 구현 하거나 일부 카운터와 함께 divertTo를 사용하여 메시지가 중단 된 구성 요소를 건너 뛰도록 할 수 있지만 akka 스트림에 더 쉬운 접근 방식이있을 수 있습니다.
나는 당신의 유스 케이스를 가지고 놀았고 Akka Actor
카운터와 비동기 맵 단계를 나타내는 솔루션을 찾았습니다 .
3
주어진 시간에 최대 요소 까지 처리 할 수 있고 최대 용량 인 카운터를 기반으로하는 아이디어 는 이러한 요소의 2
대부분 2
이 느린 구성 요소에 의해 동시에 처리 되도록 허용한다는 것 입니다.
이렇게하면 느린 구성 요소에서 분기되어 다운 스트림에 직접 도달하는 업스트림 요소에 예약 된 처리 스레드가 항상 하나 있습니다.
먼저 Counter
최대 용량을 가진 기본을 다음 과 같이 정의합시다 Akka Actor
.
class Counter(max: Int) extends Actor {
private var count: Int = 0
override def receive: Receive = {
case TryAndLock if count < max =>
count += 1
sender ! true
case TryAndLock =>
sender ! false
case Release =>
count -= 1
}
}
sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction
val counter = system.actorOf(Props(new Counter(max = 2)))
요청을 count
통해 증가 할 수 있는 가변 변수를 보유 TryAndLock
하지만 개수가 아직 최대 용량에 도달하지 않았고 Release
요청을 통해 감소 될 수있는 경우에만 가능합니다 .
우리는 사용하고 Actor
다음 내에서 그래서 동시 잠금 및 해제 작업 mapAsync
단계는 제대로 경쟁 조건없이 처리됩니다.
그런 다음 mapAsyncUnordered
카운터의 최대 용량보다 1 단위 더 높은 병렬 처리가 있는 스테이지를 사용하면 됩니다.
비동기 단계를 통과하는 모든 요소 Counter
는 리소스를 시도하고 잠그기 위해 를 쿼리합니다 . 리소스가 잠기면 요소가 느린 구성 요소로 던져집니다. 그렇지 않은 경우 건너 뜁니다. 요소는 카운터의 최대 용량에 도달 할 때까지 느린 구성 요소로 전달됩니다.이 시점에서 요소가 느린 구성 요소를 종료하고 카운터에서 리소스를 해제 할 때까지 새 요소는 건너 뜁니다.
mapAsync
스테이지가 존재할 때 요소가 업스트림의 순서를 유지하므로 건너 뛴 요소가 느린 구성 요소가 처리 한 요소가 다운 스트림으로 생성되기 전에 완료 될 때까지 기다리기 때문에 단순히 a를 사용할 수 없습니다 . 따라서 mapAsyncUnordered
대신 사용할 필요가 있습니다 .
느린 구성 요소와 병렬 처리가 3 인 비동기 맵에 의해 동시에 처리되는 최대 2 개의 요소로 예제를 정의 해 보겠습니다.
Source(0 to 15)
.throttle(1, 50.milliseconds)
.mapAsyncUnordered(parallelism = 3) { i =>
(counter ? TryAndLock).map {
case locked: Boolean if locked =>
val result = slowTask(i)
counter ! Release
result
case _ =>
skipTask(i)
}
}
.runForeach(println)
예를 들어 느린 구성 요소 ( slowTask
)와 느린 구성 요소 ( )를 건너 뛸 때 수행 할 작업을 시뮬레이션하는 다음 두 함수를 사용합니다 skipTask
.
def slowTask(value: Int): String = {
val start = Instant.now()
Thread.sleep(250)
s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
s"$value - skipped - ${Instant.now()}"
결과는 다음과 같습니다.
2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z
첫 번째 부분은 업스트림 요소의 인덱스이고, 두 번째 부분은 요소가 적용된 변환 ( processed
느린 구성 요소 또는를 입력 할 때 skipped
)이고 마지막 부분은 타임 스탬프이므로 상황이 발생할 때 시각화합니다.
스테이지에 들어가는 2 개의 첫 번째 요소 (0 및 1)는 느린 구성 요소에 의해 처리되고 다음 요소 (2, 3, 4 및 5)는이 두 개의 첫 번째 요소가 완료되고 추가 요소가 입력 될 때까지 느린 스테이지를 건너 뜁니다. 느린 단계. 등등.
길 포일
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다