如何从Kotlin协程流中的几个发射中仅发射一个结果?

细微的

我有这样的流程:

fun createRawDataFlow() = callbackFlow<String> {
    SensorProProvider.getInstance(this@MainActivity).registerDataCallback { sensorProDeviceInfo, bytes ->
        val dataString = bytes.map { it.toString() }.reduce { acc, s -> "$acc, $s" }
        val hexString = HEXUtils.byteToHex(bytes)
        Log.e("onDataReceived", "deviceInfo: ${sensorProDeviceInfo.deviceIdentify}, dataSize:${bytes.size}, data:$dataString")
        offer(hexString)
    }
    awaitClose {  }
}


GlobalScope.launch(Dispatchers.IO) {
    createRawDataFlow()
        .map {
            Log.e("onDataReceived", "map2: ${Thread.currentThread().name}")
            // what I want is collecting 10 emits of sensor's data, and return a list of them
            // arraylistOf<String>(/* 10 hexStrings here*/)
        }
        .flowOn(Dispatchers.IO)
        .collect {
            Log.e("onDataReceived", "thread: ${Thread.currentThread().name}, hexData:$it")
        }
}

就像代码中的注释一样。我想从流中收集10个十六进制字符串,因为这些字符串来自同一时间段,然后将它们打包在数组列表中以供返回。我该如何实现?是否有类似于map的运算符来执行此操作?顺便说一句,原谅我的英语不好。

克里斯蒂安·B

如果需要批量收集,并且不想取消原始流,则可以调整发射流函数的方式,使其保留值的缓存。

/*
 * Returns a list of at least [batchSize] integers.
 */
fun aFlow(cacheSize: Int = 10): Flow<List<Int>> {
  var counter: Int = 0
  val cache: MutableList<Int> = mutableListOf()

  return flow {
    while(currentCoroutineContext().isActive) {
      cache.add(counter++)

      if (cache.size >= cacheSize) {
        emit(cache)
        cache.clear()
      }

      delay(500L) // the delay is just to simulate incoming sensor data
    }
  }
}

通用解决方案

为了使它更通用,我在流上创建了一个通用扩展函数,您可以将其应用于要返回批次列表的任何流。

考虑我们有一个infiniteFlow整数:

fun infiniteFlow(): Flow<Int> {
    var counter: Int = 0

    return flow {
        while (currentCoroutineContext().isActive) {
            emit(counter++)
            delay(250L) // the delay is just to simulate incoming sensor data
        }
    }
}

和这个batch扩展功能:

/**
 * Maps the Flow<T> to Flow<List<T>>. The list size is at least [batchSize]
 */
fun <T> Flow<T>.batch(batchSize: Int = 10): Flow<List<T>> {
    val cache: MutableList<T> = mutableListOf()

    return map {
        cache.apply { add(it) }
    }.filter { it.size >= batchSize }
        .map {
            mutableListOf<T>().apply { // copy the list and clears the cache
                addAll(cache)
                cache.clear()
            }
        }
}

注意:这仅是示例。没有针对边缘情况进行优化或测试!

然后,您可以使用以下功能:

infiniteFlow().batch(batchSize = 12).collect { println(it) }

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

协程仅发射一次

来自分类Dev

如何从另一个类(在Kotlin中)退出runBlocking协程?

来自分类Dev

一个()发射多次

来自分类Dev

我如何使socket.io仅向一个客户端发送发射消息?

来自分类Dev

科特琳协程多次发射

来自分类Dev

如何聆听另一个物体的事件发射?

来自分类Dev

如何聆听另一个物体的事件发射?

来自分类Dev

Kotlin:杀死一个不合作的协程

来自分类Dev

如何在kotlin中使用协程每秒调用一个函数

来自分类Dev

在Kotlin中,一个线程一次只能运行一个协程?

来自分类Dev

我该如何在C#中创建一个功能,使其沿右侧的直线从枪上发射子弹?

来自分类Dev

如何创建一个在2个事件之间跳过发射的可观察对象

来自分类Dev

如何在另一个模块中使用事件发射器?

来自分类Dev

Kotlin协程流中的RxJava .toList()等效项

来自分类Dev

如何防止 timeout(...) 取消流发射?

来自分类Dev

如何从Android Kotlin协程获取结果到UI线程

来自分类Dev

我应该如何在__init__中定义一个依赖于协程的变量?

来自分类Dev

Javascript,仅发射激光一次

来自分类Dev

延迟发射物品,直到从另一个可观察到的物品发射出来

来自分类Dev

在映射中创建值流,这些流是Java中另一个映射中的值

来自分类Dev

在Kotlin中测试协程

来自分类Dev

不能从一个孩子发射到另一个

来自分类Dev

在另一个协程中运行Tarantool Lua函数

来自分类Dev

为什么只有一个协程在我的代码中起作用?

来自分类Dev

从另一个线程恢复asio协程

来自分类Dev

CAEmitterCell 以两个相反的方向发射(一个是错误的)

来自分类Dev

JWPlayer触发onPlay()两次。如何使其仅发射一次?

来自分类Dev

RX Subject使用上一个值添加新的发射

来自分类Dev

RxJs:使用发射另一个可观察值来过滤可观察内容

Related 相关文章

  1. 1

    协程仅发射一次

  2. 2

    如何从另一个类(在Kotlin中)退出runBlocking协程?

  3. 3

    一个()发射多次

  4. 4

    我如何使socket.io仅向一个客户端发送发射消息?

  5. 5

    科特琳协程多次发射

  6. 6

    如何聆听另一个物体的事件发射?

  7. 7

    如何聆听另一个物体的事件发射?

  8. 8

    Kotlin:杀死一个不合作的协程

  9. 9

    如何在kotlin中使用协程每秒调用一个函数

  10. 10

    在Kotlin中,一个线程一次只能运行一个协程?

  11. 11

    我该如何在C#中创建一个功能,使其沿右侧的直线从枪上发射子弹?

  12. 12

    如何创建一个在2个事件之间跳过发射的可观察对象

  13. 13

    如何在另一个模块中使用事件发射器?

  14. 14

    Kotlin协程流中的RxJava .toList()等效项

  15. 15

    如何防止 timeout(...) 取消流发射?

  16. 16

    如何从Android Kotlin协程获取结果到UI线程

  17. 17

    我应该如何在__init__中定义一个依赖于协程的变量?

  18. 18

    Javascript,仅发射激光一次

  19. 19

    延迟发射物品,直到从另一个可观察到的物品发射出来

  20. 20

    在映射中创建值流,这些流是Java中另一个映射中的值

  21. 21

    在Kotlin中测试协程

  22. 22

    不能从一个孩子发射到另一个

  23. 23

    在另一个协程中运行Tarantool Lua函数

  24. 24

    为什么只有一个协程在我的代码中起作用?

  25. 25

    从另一个线程恢复asio协程

  26. 26

    CAEmitterCell 以两个相反的方向发射(一个是错误的)

  27. 27

    JWPlayer触发onPlay()两次。如何使其仅发射一次?

  28. 28

    RX Subject使用上一个值添加新的发射

  29. 29

    RxJs:使用发射另一个可观察值来过滤可观察内容

热门标签

归档