중단 된 경우 스트림의 일부 건너 뛰기

리처드 헨드릭스

메시지를 지연시킬 수있는 구성 요소를 통해 메시지가 전달되는 상황이 있습니다.

스트레스를 받으면이 구성 요소를 건너 뛰고 동시에 X 개 이상의 메시지가 지연되지 않도록하고 싶습니다. 넘쳐나는 메시지는이 단계를 건너 뛰고 다음 단계로 넘어갑니다.

메시지는이 단계에서 미래가 완료 될 때까지 또는 최대 1 분 중 먼저 도래하는 시점까지 중단됩니다.

이 버퍼 예제 와 유사한 사용자 정의 GraphStage를 구현 하거나 일부 카운터와 함께 divertTo를 사용하여 메시지가 중단 된 구성 요소를 건너 뛰도록 할 수 있지만 akka 스트림에 더 쉬운 접근 방식이있을 수 있습니다.

자비에 구이 호트

나는 당신의 유스 케이스를 가지고 놀았고 Akka Actor카운터와 비동기 맵 단계를 나타내는 솔루션을 찾았습니다 .

3주어진 시간에 최대 요소 까지 처리 할 수 ​​있고 최대 용량 인 카운터를 기반으로하는 아이디어 는 이러한 요소의 2대부분 2이 느린 구성 요소에 의해 동시에 처리 되도록 허용한다는 것 입니다.

이렇게하면 느린 구성 요소에서 분기되어 다운 스트림에 직접 도달하는 업스트림 요소에 예약 된 처리 스레드가 항상 하나 있습니다.


먼저 Counter최대 용량을 가진 기본을 다음 과 같이 정의합시다 Akka Actor.

class Counter(max: Int) extends Actor {
  private var count: Int = 0

  override def receive: Receive = {
    case TryAndLock if count < max =>
      count += 1
      sender ! true
    case TryAndLock =>
      sender ! false
    case Release =>
      count -= 1
  }
}

sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction

val counter = system.actorOf(Props(new Counter(max = 2)))

요청을 count통해 증가 할 수 있는 가변 변수를 보유 TryAndLock하지만 개수가 아직 최대 용량에 도달하지 않았고 Release요청을 통해 감소 될 수있는 경우에만 가능합니다 .

우리는 사용하고 Actor다음 내에서 그래서 동시 잠금 및 해제 작업 mapAsync단계는 제대로 경쟁 조건없이 처리됩니다.


그런 다음 mapAsyncUnordered카운터의 최대 용량보다 1 단위 더 높은 병렬 처리가 있는 스테이지를 사용하면 됩니다.

비동기 단계를 통과하는 모든 요소 Counter는 리소스를 시도하고 잠그기 위해 쿼리합니다 . 리소스가 잠기면 요소가 느린 구성 요소로 던져집니다. 그렇지 않은 경우 건너 뜁니다. 요소는 카운터의 최대 용량에 도달 할 때까지 느린 구성 요소로 전달됩니다.이 시점에서 요소가 느린 구성 요소를 종료하고 카운터에서 리소스를 해제 할 때까지 새 요소는 건너 뜁니다.

mapAsync스테이지가 존재할 때 요소가 업스트림의 순서를 유지하므로 건너 뛴 요소가 느린 구성 요소가 처리 한 요소가 다운 스트림으로 생성되기 전에 완료 될 때까지 기다리기 때문에 단순히 a를 사용할 수 없습니다 . 따라서 mapAsyncUnordered대신 사용할 필요가 있습니다 .

느린 구성 요소와 병렬 처리가 3 인 비동기 맵에 의해 동시에 처리되는 최대 2 개의 요소로 예제를 정의 해 보겠습니다.

Source(0 to 15)
  .throttle(1, 50.milliseconds)
  .mapAsyncUnordered(parallelism = 3) { i =>
    (counter ? TryAndLock).map {
      case locked: Boolean if locked =>
        val result = slowTask(i)
        counter ! Release
        result
      case _ =>
        skipTask(i)
    }
  }
  .runForeach(println)

예를 들어 느린 구성 요소 ( slowTask)와 느린 구성 요소 ( )를 건너 뛸 때 수행 할 작업을 시뮬레이션하는 다음 두 함수를 사용합니다 skipTask.

def slowTask(value: Int): String = {
  val start = Instant.now()
  Thread.sleep(250)
  s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
  s"$value - skipped - ${Instant.now()}"

결과는 다음과 같습니다.

2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z

첫 번째 부분은 업스트림 요소의 인덱스이고, 두 번째 부분은 요소가 적용된 변환 ( processed느린 구성 요소 또는를 입력 할 때 skipped)이고 마지막 부분은 타임 스탬프이므로 상황이 발생할 때 시각화합니다.

스테이지에 들어가는 2 개의 첫 번째 요소 (0 및 1)는 느린 구성 요소에 의해 처리되고 다음 요소 (2, 3, 4 및 5)는이 두 개의 첫 번째 요소가 완료되고 추가 요소가 입력 될 때까지 느린 스테이지를 건너 뜁니다. 느린 단계. 등등.

길 포일

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

중첩 된 for 루프에서 동일한 값 건너 뛰기

분류에서Dev

스크립팅 된 Jenkins 파이프 라인, 단계의 병렬 부분 건너 뛰기

분류에서Dev

javacc의 일부 위치에서 선언 된 토큰 건너 뛰기

분류에서Dev

WooCommerce : anotner 속성이 루프의 일부인 경우 foreach의 속성 건너 뛰기

분류에서Dev

bash 스크립트의 여기 문서 구분 기호 중 일부를 건너 뛰었습니다.

분류에서Dev

이미 설치된 경우 MSI 건너 뛰기

분류에서Dev

명령 줄에서 지정된 Python 단위 테스트 건너 뛰기

분류에서Dev

스캐너로 txt 파일의 단어를 읽는 동안 줄 건너 뛰기

분류에서Dev

컴파일 중 javadoc 건너 뛰기

분류에서Dev

maven, 이미 다운로드 된 경우 아티팩트 다운로드 건너 뛰기

분류에서Dev

jqgrid 조건부에서 다중 선택의 확인란 건너 뛰기

분류에서Dev

인쇄 문이 포함 된 if-else가 스캐너를 건너 뛰는 경우

분류에서Dev

단위 테스트에서 내부 메서드 호출 건너 뛰기

분류에서Dev

TestNG 테스트의 조건부 건너 뛰기

분류에서Dev

Ruby RPG의 버그 (경우에 따라 줄 건너 뛰기)

분류에서Dev

자바 스크립트에서 배열의 일부 건너 뛰기

분류에서Dev

ansible에서 프롬프트로 일부 작업 건너 뛰기

분류에서Dev

연결된 값이 chartJS에서 null 인 경우 레이블 건너 뛰기

분류에서Dev

계정 등록이 완료된 경우에만 활동 건너 뛰기

분류에서Dev

RegEx 금지 된 단어가있는 경우 문자열의 마지막 부분을 건너 뜁니다.

분류에서Dev

R의 travis에서 특정 테스트 파일 건너 뛰기

분류에서Dev

sparklyr : 텍스트 파일의 첫 줄 건너 뛰기

분류에서Dev

조건이 False 인 경우 For 루프의 현재 반복 건너 뛰기

분류에서Dev

입력 스트림 값 건너 뛰기

분류에서Dev

Excel로 파일 가져 오기-찾을 수없는 경우 건너 뛰기

분류에서Dev

Bash 스크립트 파티션이 이미있는 경우 건너 뛰기

분류에서Dev

데이터 프레임 부분 설정 중 일부 행 건너 뛰기

분류에서Dev

Spring 3.2.x : 406 Not Acceptable의 경우 컨트롤러 메서드 건너 뛰기

분류에서Dev

너무 많은 텍스트 조건이있는 문이 중첩 된 경우 ERROR

Related 관련 기사

  1. 1

    중첩 된 for 루프에서 동일한 값 건너 뛰기

  2. 2

    스크립팅 된 Jenkins 파이프 라인, 단계의 병렬 부분 건너 뛰기

  3. 3

    javacc의 일부 위치에서 선언 된 토큰 건너 뛰기

  4. 4

    WooCommerce : anotner 속성이 루프의 일부인 경우 foreach의 속성 건너 뛰기

  5. 5

    bash 스크립트의 여기 문서 구분 기호 중 일부를 건너 뛰었습니다.

  6. 6

    이미 설치된 경우 MSI 건너 뛰기

  7. 7

    명령 줄에서 지정된 Python 단위 테스트 건너 뛰기

  8. 8

    스캐너로 txt 파일의 단어를 읽는 동안 줄 건너 뛰기

  9. 9

    컴파일 중 javadoc 건너 뛰기

  10. 10

    maven, 이미 다운로드 된 경우 아티팩트 다운로드 건너 뛰기

  11. 11

    jqgrid 조건부에서 다중 선택의 확인란 건너 뛰기

  12. 12

    인쇄 문이 포함 된 if-else가 스캐너를 건너 뛰는 경우

  13. 13

    단위 테스트에서 내부 메서드 호출 건너 뛰기

  14. 14

    TestNG 테스트의 조건부 건너 뛰기

  15. 15

    Ruby RPG의 버그 (경우에 따라 줄 건너 뛰기)

  16. 16

    자바 스크립트에서 배열의 일부 건너 뛰기

  17. 17

    ansible에서 프롬프트로 일부 작업 건너 뛰기

  18. 18

    연결된 값이 chartJS에서 null 인 경우 레이블 건너 뛰기

  19. 19

    계정 등록이 완료된 경우에만 활동 건너 뛰기

  20. 20

    RegEx 금지 된 단어가있는 경우 문자열의 마지막 부분을 건너 뜁니다.

  21. 21

    R의 travis에서 특정 테스트 파일 건너 뛰기

  22. 22

    sparklyr : 텍스트 파일의 첫 줄 건너 뛰기

  23. 23

    조건이 False 인 경우 For 루프의 현재 반복 건너 뛰기

  24. 24

    입력 스트림 값 건너 뛰기

  25. 25

    Excel로 파일 가져 오기-찾을 수없는 경우 건너 뛰기

  26. 26

    Bash 스크립트 파티션이 이미있는 경우 건너 뛰기

  27. 27

    데이터 프레임 부분 설정 중 일부 행 건너 뛰기

  28. 28

    Spring 3.2.x : 406 Not Acceptable의 경우 컨트롤러 메서드 건너 뛰기

  29. 29

    너무 많은 텍스트 조건이있는 문이 중첩 된 경우 ERROR

뜨겁다태그

보관