생성 방법으로 만든 차가운 Flowable이 있습니다. 차갑게 유지하고 싶지만 또한 소비자의 추가 항목에 대한 요청을 "제한"하고 싶습니다. 예를 들어 소비자 (구독자?)가 항목을 처리하는 데 400ms가 걸리고 제한이 1 초로 설정된 경우 다음과 같은 타임 라인이 표시 될 것으로 예상합니다.
0 - generate callback called to generate next value
1 - consumer starts processing generated value
401 - consumer finished processing value
401 - consumer requests next item
1000 - generate callback called to generate next value
다음은 알아 내기 위해 사용하는 샘플 코드입니다.
val startTime = System.currentTimeMillis()
fun log(msg: String) {
println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}
val generator = Flowable.generate<Int, Int>(
Callable { 0 },
BiFunction { state, emitter ->
val value = state + 1
log("generating $value")
emitter.onNext(value)
return@BiFunction value
})
val subscription = generator
.concatMap { Flowable.concat(Flowable.just(it), Flowable.empty<Int>().delay(1, TimeUnit.SECONDS)) }
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io(), true, 1)
.subscribe {
log("processing $it")
if (it % 5 == 0) {
log("starting sleep")
try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
log("done sleeping")
}
}
Thread.sleep(8_000)
subscription.dispose()
log("done")
현재 내가 얻는 출력은 다음과 같습니다.
RxComputationThreadPool-1 - 278 - generating 1
RxCachedThreadScheduler-1 - 283 - processing 1
RxComputationThreadPool-1 - 285 - generating 2
RxComputationThreadPool-2 - 1287 - generating 3
RxComputationThreadPool-2 - 1288 - generating 4
RxCachedThreadScheduler-1 - 1288 - processing 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-4 - 3288 - generating 5
RxComputationThreadPool-4 - 3288 - generating 6
RxCachedThreadScheduler-1 - 3288 - processing 4
RxCachedThreadScheduler-1 - 4289 - processing 5
RxCachedThreadScheduler-1 - 4289 - starting sleep
RxComputationThreadPool-6 - 5290 - generating 7
RxComputationThreadPool-6 - 5290 - generating 8
RxCachedThreadScheduler-1 - 6489 - done sleeping
RxCachedThreadScheduler-1 - 6489 - processing 6
RxCachedThreadScheduler-1 - 7490 - processing 7
main - 8265 - done
내가 얻고 싶은 출력 은 다음과 같습니다.
RxComputationThreadPool-1 - 278 - generating 1
RxCachedThreadScheduler-1 - 283 - processing 1
RxComputationThreadPool-1 - 1285 - generating 2 // 2 generated one second after 1
RxCachedThreadScheduler-2 - 1287 - processing 2 // but once generated, processed immediately
RxComputationThreadPool-2 - 2288 - generating 3 // 3 generated one second after 2
RxCachedThreadScheduler-1 - 2288 - processing 3
RxComputationThreadPool-1 - 3289 - generating 4 // 4 generated one second after 3
RxCachedThreadScheduler-4 - 3289 - processing 4
RxComputationThreadPool-4 - 4290 - generating 5 // 5 generated one second after 4
RxCachedThreadScheduler-1 - 4290 - processing 5
RxCachedThreadScheduler-1 - 4290 - starting sleep // item 5 takes longer to process
RxCachedThreadScheduler-1 - 6491 - done sleeping // 2200ms later its done
RxComputationThreadPool-6 - 6492 - generating 6 // now that consumer is done, it requests next item and gets generated immediately since it has been 1 second since last request
RxCachedThreadScheduler-1 - 6492 - processing 6
RxComputationThreadPool-1 - 7492 - generating 7 // 7 generated one second after 6
RxCachedThreadScheduler-4 - 7493 - processing 7
main - 8265 - done
RxJavaExtensions (RxJava2 버전) 변환기 중 하나가 문제에 대한 해결책이어야합니다 spanout()
. 상류에서 배출되는 배출 사이에 지연을 삽입합니다. 코드에서 한 줄만 변경했습니다 (으로 대체 concatMap()
됨 spanout()
).
val startTime = System.currentTimeMillis()
fun log(msg: String) {
println(String.format("%s - %4d - %s", Thread.currentThread().name, System.currentTimeMillis() - startTime, msg))
}
val generator = Flowable.generate<Int, Int>(
Callable { 0 },
BiFunction { state, emitter ->
val value = state + 1
log("generating $value")
emitter.onNext(value)
return@BiFunction value
})
val subscription = generator
.compose(FlowableTransformers.spanout(1, 1, TimeUnit.SECONDS)) // <– changed line
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io(), true, 1)
.subscribe {
log("processing $it")
if (it % 5 == 0) {
log("starting sleep")
try { Thread.sleep(2200) } catch (e: InterruptedException) { log("interrupted") }
log("done sleeping")
}
}
Thread.sleep(8_000)
subscription.dispose()
log("done")
생산 된 출력 :
RxComputationThreadPool-1 - 153 - generating 1
RxCachedThreadScheduler-1 - 1178 - processing 1
RxComputationThreadPool-1 - 1179 - generating 2
RxCachedThreadScheduler-1 - 2176 - processing 2
RxComputationThreadPool-1 - 2177 - generating 3
RxCachedThreadScheduler-1 - 3177 - processing 3
RxComputationThreadPool-1 - 3178 - generating 4
RxCachedThreadScheduler-1 - 4175 - processing 4
RxComputationThreadPool-1 - 4175 - generating 5
RxCachedThreadScheduler-1 - 5177 - processing 5
RxCachedThreadScheduler-1 - 5178 - starting sleep
RxCachedThreadScheduler-1 - 7383 - done sleeping
RxComputationThreadPool-1 - 7384 - generating 6
RxCachedThreadScheduler-1 - 7384 - processing 6
RxComputationThreadPool-1 - 7385 - generating 7
main - 8151 - done
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다