我有这样的流程:
fun createRawDataFlow() = callbackFlow<String> {
SensorProProvider.getInstance(this@MainActivity).registerDataCallback { sensorProDeviceInfo, bytes ->
val dataString = bytes.map { it.toString() }.reduce { acc, s -> "$acc, $s" }
val hexString = HEXUtils.byteToHex(bytes)
Log.e("onDataReceived", "deviceInfo: ${sensorProDeviceInfo.deviceIdentify}, dataSize:${bytes.size}, data:$dataString")
offer(hexString)
}
awaitClose { }
}
GlobalScope.launch(Dispatchers.IO) {
createRawDataFlow()
.map {
Log.e("onDataReceived", "map2: ${Thread.currentThread().name}")
// what I want is collecting 10 emits of sensor's data, and return a list of them
// arraylistOf<String>(/* 10 hexStrings here*/)
}
.flowOn(Dispatchers.IO)
.collect {
Log.e("onDataReceived", "thread: ${Thread.currentThread().name}, hexData:$it")
}
}
就像代码中的注释一样。我想从流中收集10个十六进制字符串,因为这些字符串来自同一时间段,然后将它们打包在数组列表中以供返回。我该如何实现?是否有类似于map的运算符来执行此操作?顺便说一句,原谅我的英语不好。
如果需要批量收集,并且不想取消原始流,则可以调整发射流函数的方式,使其保留值的缓存。
/*
* Returns a list of at least [batchSize] integers.
*/
fun aFlow(cacheSize: Int = 10): Flow<List<Int>> {
var counter: Int = 0
val cache: MutableList<Int> = mutableListOf()
return flow {
while(currentCoroutineContext().isActive) {
cache.add(counter++)
if (cache.size >= cacheSize) {
emit(cache)
cache.clear()
}
delay(500L) // the delay is just to simulate incoming sensor data
}
}
}
通用解决方案
为了使它更通用,我在流上创建了一个通用扩展函数,您可以将其应用于要返回批次列表的任何流。
考虑我们有一个infiniteFlow
整数:
fun infiniteFlow(): Flow<Int> {
var counter: Int = 0
return flow {
while (currentCoroutineContext().isActive) {
emit(counter++)
delay(250L) // the delay is just to simulate incoming sensor data
}
}
}
和这个batch
扩展功能:
/**
* Maps the Flow<T> to Flow<List<T>>. The list size is at least [batchSize]
*/
fun <T> Flow<T>.batch(batchSize: Int = 10): Flow<List<T>> {
val cache: MutableList<T> = mutableListOf()
return map {
cache.apply { add(it) }
}.filter { it.size >= batchSize }
.map {
mutableListOf<T>().apply { // copy the list and clears the cache
addAll(cache)
cache.clear()
}
}
}
注意:这仅是示例。没有针对边缘情况进行优化或测试!
然后,您可以使用以下功能:
infiniteFlow().batch(batchSize = 12).collect { println(it) }
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句