콜드 플로우 가능한 업스트림으로 요청을 제한하는 방법

Aarjav

생성 방법으로 만든 차가운 Flowable이 있습니다. 차갑게 유지하고 싶지만 또한 소비자의 추가 항목에 대한 요청을 "제한"하고 싶습니다. 예를 들어 소비자 (구독자?)가 항목을 처리하는 데 400ms가 걸리고 제한이 1 초로 설정된 경우 다음과 같은 타임 라인이 표시 될 것으로 예상합니다.

0 - generate callback called to generate next value
1 - consumer starts processing generated value
401 - consumer finished processing value
401 - consumer requests next item
1000 - generate callback called to generate next value

다음은 알아 내기 위해 사용하는 샘플 코드입니다.

    val startTime = System.currentTimeMillis()
    fun log(msg: String) {
        println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
    }

    val generator = Flowable.generate<Int, Int>(
        Callable { 0 },
        BiFunction { state, emitter ->
            val value = state + 1
            log("generating $value")
            emitter.onNext(value)
            return@BiFunction value
        })

    val subscription = generator
        .concatMap { Flowable.concat(Flowable.just(it), Flowable.empty<Int>().delay(1, TimeUnit.SECONDS)) }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io(), true, 1)
        .subscribe {
            log("processing $it")
            if (it % 5 == 0) {
                log("starting sleep")
                try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
                log("done sleeping")
            }
        }

    Thread.sleep(8_000)
    subscription.dispose()
    log("done")

현재 내가 얻는 출력은 다음과 같습니다.

RxComputationThreadPool-1 -  278 - generating 1
RxCachedThreadScheduler-1 -  283 - processing 1
RxComputationThreadPool-1 -  285 - generating 2
RxComputationThreadPool-2 - 1287 - generating 3
RxComputationThreadPool-2 - 1288 - generating 4
RxCachedThreadScheduler-1 - 1288 - processing 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-4 - 3288 - generating 5
RxComputationThreadPool-4 - 3288 - generating 6
RxCachedThreadScheduler-1 - 3288 - processing 4
RxCachedThreadScheduler-1 - 4289 - processing 5
RxCachedThreadScheduler-1 - 4289 - starting sleep
RxComputationThreadPool-6 - 5290 - generating 7
RxComputationThreadPool-6 - 5290 - generating 8
RxCachedThreadScheduler-1 - 6489 - done sleeping
RxCachedThreadScheduler-1 - 6489 - processing 6
RxCachedThreadScheduler-1 - 7490 - processing 7
main - 8265 - done

내가 얻고 싶은 출력 은 다음과 같습니다.

RxComputationThreadPool-1 -  278 - generating 1
RxCachedThreadScheduler-1 -  283 - processing 1
RxComputationThreadPool-1 - 1285 - generating 2 // 2 generated one second after 1
RxCachedThreadScheduler-2 - 1287 - processing 2 // but once generated, processed immediately
RxComputationThreadPool-2 - 2288 - generating 3 // 3 generated one second after 2
RxCachedThreadScheduler-1 - 2288 - processing 3 
RxComputationThreadPool-1 - 3289 - generating 4 // 4 generated one second after 3
RxCachedThreadScheduler-4 - 3289 - processing 4
RxComputationThreadPool-4 - 4290 - generating 5 // 5 generated one second after 4
RxCachedThreadScheduler-1 - 4290 - processing 5
RxCachedThreadScheduler-1 - 4290 - starting sleep // item 5 takes longer to process
RxCachedThreadScheduler-1 - 6491 - done sleeping  // 2200ms later its done
RxComputationThreadPool-6 - 6492 - generating 6 // now that consumer is done, it requests next item and gets generated immediately since it has been 1 second since last request
RxCachedThreadScheduler-1 - 6492 - processing 6
RxComputationThreadPool-1 - 7492 - generating 7 // 7 generated one second after 6
RxCachedThreadScheduler-4 - 7493 - processing 7
main - 8265 - done
스카이 월

RxJavaExtensions (RxJava2 버전) 변환기 중 하나가 문제에 대한 해결책이어야합니다 spanout(). 상류에서 배출되는 배출 사이에 지연을 삽입합니다. 코드에서 한 줄만 변경했습니다 (으로 대체 concatMap()spanout()).

val startTime = System.currentTimeMillis()
fun log(msg: String) {
    println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}

val generator = Flowable.generate<Int, Int>(
    Callable { 0 },
    BiFunction { state, emitter ->
        val value = state + 1
        log("generating $value")
        emitter.onNext(value)
        return@BiFunction value
    })

val subscription = generator
    .compose(FlowableTransformers.spanout(1, 1, TimeUnit.SECONDS)) // <– changed line
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io(), true, 1)
    .subscribe {
        log("processing $it")
        if (it % 5 == 0) {
            log("starting sleep")
            try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
            log("done sleeping")
        }
    }

Thread.sleep(8_000)
subscription.dispose()
log("done")

생산 된 출력 :

RxComputationThreadPool-1 -  153 - generating 1
RxCachedThreadScheduler-1 - 1178 - processing 1
RxComputationThreadPool-1 - 1179 - generating 2
RxCachedThreadScheduler-1 - 2176 - processing 2
RxComputationThreadPool-1 - 2177 - generating 3
RxCachedThreadScheduler-1 - 3177 - processing 3
RxComputationThreadPool-1 - 3178 - generating 4
RxCachedThreadScheduler-1 - 4175 - processing 4
RxComputationThreadPool-1 - 4175 - generating 5
RxCachedThreadScheduler-1 - 5177 - processing 5
RxCachedThreadScheduler-1 - 5178 - starting sleep
RxCachedThreadScheduler-1 - 7383 - done sleeping
RxComputationThreadPool-1 - 7384 - generating 6
RxCachedThreadScheduler-1 - 7384 - processing 6
RxComputationThreadPool-1 - 7385 - generating 7
main - 8151 - done

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

다운로드 가능한 매우 무거운 멀티 플레이어 게임을 처리하는 방법

분류에서Dev

NGRX / RXJS : 관찰 가능한 스트림을 단일 플랫 스트림으로 재귀 적으로 매핑하는 방법

분류에서Dev

7 x 7 리플 로우 가능한 그리드 / 테이블을 만드는 가장 좋은 방법

분류에서Dev

7 x 7 리플 로우 가능한 그리드 / 테이블을 만드는 가장 좋은 방법

분류에서Dev

django에서 다운로드 가능한 zip 파일을 제공하는 방법

분류에서Dev

WooCommerce, 주문을 위해 다운로드 가능한 제품 권한을 제거하는 방법

분류에서Dev

드래그 가능한 모든 요소가 jQuery droppable로 삭제되었는지 확인하는 방법

분류에서Dev

스프레드 시트 부가 기능에 대한 인증 요청을 최소화하는 방법

분류에서Dev

Bash 용로드 가능한 내장 기능을 빌드하는 방법

분류에서Dev

http 요청을 캐시하거나로드하는 각도 적절한 방법

분류에서Dev

실제로 작동하는 부팅 가능한 USB 스틱을 만드는 방법은 무엇입니까?

분류에서Dev

커널에서로드 가능한 모듈을 강제로 제거하는 방법

분류에서Dev

콜백을 제거한 후 안드로이드 핸들러를 재사용하는 방법

분류에서Dev

한 번의 요청으로 한 필드의 값을 늘리는 방법

분류에서Dev

GROUPBY 자바 스트림으로 제한하는 방법

분류에서Dev

Clojure로 여러 API 요청을 만드는 기능적 방법

분류에서Dev

Windows 7, ubuntu 14.04 및 ubuntu 14.10을 트리플 부팅하는 방법 (가능한 경우 Windows 로더 유지)

분류에서Dev

요청 후 방법을 사용하여 한 작업을 다른 작업으로 리디렉션하는 방법

분류에서Dev

squashfs 루트로 부팅 가능한 시스템을 만드는 방법

분류에서Dev

정렬 가능한 div를 외부 요소로 드래그하는 방법

분류에서Dev

읽기 가능한 이벤트 리스너 콜백 내부에서 읽을 때 노드의 process.stdin 읽기 가능한 스트림 로그 Null

분류에서Dev

방어 복사본을 스트림으로 변환해도 스레드로부터 안전한가요?

분류에서Dev

반복 가능한 프로그램을 만드는 방법

분류에서Dev

Ruby : 가능한 가장 간단한 방법으로 병렬로 작업을 실행하는 방법

분류에서Dev

취소 가능한 요청 스트림에서 단일 비동기 웹 서비스 호출을 반복적으로 사용하는 방법은 무엇입니까?

분류에서Dev

droppable div로 드래그 가능한 div를 제거하는 방법

분류에서Dev

로드에 실패한 경우 app.js 스크립트 태그에 onerror 콜백을 추가하는 더 좋은 방법이 있습니까?

분류에서Dev

딥 콜에서 serilog로 요청을 추적하는 방법

분류에서Dev

고지대 스트림을 노드 판독 가능 스트림으로 변환하는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    다운로드 가능한 매우 무거운 멀티 플레이어 게임을 처리하는 방법

  2. 2

    NGRX / RXJS : 관찰 가능한 스트림을 단일 플랫 스트림으로 재귀 적으로 매핑하는 방법

  3. 3

    7 x 7 리플 로우 가능한 그리드 / 테이블을 만드는 가장 좋은 방법

  4. 4

    7 x 7 리플 로우 가능한 그리드 / 테이블을 만드는 가장 좋은 방법

  5. 5

    django에서 다운로드 가능한 zip 파일을 제공하는 방법

  6. 6

    WooCommerce, 주문을 위해 다운로드 가능한 제품 권한을 제거하는 방법

  7. 7

    드래그 가능한 모든 요소가 jQuery droppable로 삭제되었는지 확인하는 방법

  8. 8

    스프레드 시트 부가 기능에 대한 인증 요청을 최소화하는 방법

  9. 9

    Bash 용로드 가능한 내장 기능을 빌드하는 방법

  10. 10

    http 요청을 캐시하거나로드하는 각도 적절한 방법

  11. 11

    실제로 작동하는 부팅 가능한 USB 스틱을 만드는 방법은 무엇입니까?

  12. 12

    커널에서로드 가능한 모듈을 강제로 제거하는 방법

  13. 13

    콜백을 제거한 후 안드로이드 핸들러를 재사용하는 방법

  14. 14

    한 번의 요청으로 한 필드의 값을 늘리는 방법

  15. 15

    GROUPBY 자바 스트림으로 제한하는 방법

  16. 16

    Clojure로 여러 API 요청을 만드는 기능적 방법

  17. 17

    Windows 7, ubuntu 14.04 및 ubuntu 14.10을 트리플 부팅하는 방법 (가능한 경우 Windows 로더 유지)

  18. 18

    요청 후 방법을 사용하여 한 작업을 다른 작업으로 리디렉션하는 방법

  19. 19

    squashfs 루트로 부팅 가능한 시스템을 만드는 방법

  20. 20

    정렬 가능한 div를 외부 요소로 드래그하는 방법

  21. 21

    읽기 가능한 이벤트 리스너 콜백 내부에서 읽을 때 노드의 process.stdin 읽기 가능한 스트림 로그 Null

  22. 22

    방어 복사본을 스트림으로 변환해도 스레드로부터 안전한가요?

  23. 23

    반복 가능한 프로그램을 만드는 방법

  24. 24

    Ruby : 가능한 가장 간단한 방법으로 병렬로 작업을 실행하는 방법

  25. 25

    취소 가능한 요청 스트림에서 단일 비동기 웹 서비스 호출을 반복적으로 사용하는 방법은 무엇입니까?

  26. 26

    droppable div로 드래그 가능한 div를 제거하는 방법

  27. 27

    로드에 실패한 경우 app.js 스크립트 태그에 onerror 콜백을 추가하는 더 좋은 방법이 있습니까?

  28. 28

    딥 콜에서 serilog로 요청을 추적하는 방법

  29. 29

    고지대 스트림을 노드 판독 가능 스트림으로 변환하는 방법은 무엇입니까?

뜨겁다태그

보관