我遇到一种情况,需要观察userId,然后使用这些userId观察用户。用户ID或用户都可以随时更改,我想使发出的用户保持最新状态。这是我拥有的数据源的示例:
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
在这种情况下,我希望排放量为:
[User(abc_name), User(def_name)]
, 然后
[User(123_name), User(234_name)]
, 然后
[User(123_name_updated), User(234_name_updated)]
我想我可以在RxJava中实现以下目标:
observeBestUserIds.concatMapSingle { ids ->
Observable.fromIterable(ids)
.concatMap { id ->
observeUserForId(id)
}
.toList()
}
我将编写什么函数来生成发出该事件的流?
我相信您正在寻找combine
,它为您提供了一个可以轻松调用的数组toList()
:
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.collect {
println(it)
}
}
这是内部具有更明确的参数名称的部分,因为您无法在Stack Overflow上看到IDE的类型提示:
combine(
ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
println(listOfUsers)
}
输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
现场演示(请注意,在演示中,所有输出都立即出现,但这是演示站点的限制-这些行以在本地运行代码时的期望时间显示)
这就避免了(abc_name_updated
,def_name_updated
在原来的问题)的讨论。但是,仍然存在带有123_name_updated
和的中间发射,234_name
因为123_name_updated
会先发射,然后立即发送合并版本,因为它们是每个流中的最新消息。
但是,可以通过消除发射的抖动来避免这种情况(在我的机器上,只有1ms的超时有效,但我保守地做了20ms):
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.debounce(timeoutMillis = 20).collect {
println(it)
}
}
这将为您提供所需的确切输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句