带有翻新,协程和挂起功能的并行请求

瓦伦丁

我正在使用Retrofit以发出一些网络请求。我还将协同程序与“挂起”功能结合使用。

我的问题是:有没有办法改善以下代码。这个想法是并行启动多个请求,并等待它们全部完成,然后再继续执行该功能。

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(请注意,“ MyService.getItem()”是一个“暂停”函数。)

我猜在这种情况下,有什么比foreach更好的了

有人有主意吗?

马克·托波尼克(Marko Topolnik)

我准备了三种方法来解决此问题,从最简单到最正确的一种。为了简化方法的表示,我提取了以下通用代码:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

在继续之前,请注意以下一般事项:您的getItem()函数是可挂起的,无需将其提交给IO调度程序。您的所有协程都可以在主线程上运行。

现在让我们看看如何实现fetchItems(itemIds)

1.简单的每一步

在这里,我们利用了所有协程代码都可以在主线程上运行的事实:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope将等待您launch里面的所有协程即使它们都同时运行,启动的协程仍将分派到单个(主)线程,因此从每个线程更新映射不会出现并发问题。

2.线程安全变体

它利用了单线程上下文的属性这一事实可以看作是第一种方法的局限性:它不能推广到基于线程池的上下文。我们可以依靠以下async-await机制来避免这种限制

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

在这里,我们依赖于以下两个非显而易见的属性Collection.map()

  1. 它会急切地执行所有转换,因此,对的集合的第一次转换Deferred<Pair<Long, Item>>已完全完成,然后进入第二阶段,我们等待所有阶段。
  2. 它是一个内联函数,即使函数本身不是asuspend fun且得到一个不可暂停的lambda ,它也允许我们在其中编写可挂起的代码(Deferred<T>) -> T

这意味着所有获取操作是同时完成的,但是地图被组装在一个协程中。

3.具有改进的并发控制的基于流的方法

以上为我们解决了并发问题,但它没有任何背压。如果您的输入列表很大,您将希望限制同时发出的网络请求数量。

您可以使用Flow基于-的习惯用法来做到这一点

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

魔术在这里.flatMapMerge运作。您给它提供一个函数(T) -> Flow<R>,它将在所有输入上顺序执行它,但是随后它将同时收集所有得到的流。请注意,我不能简化flow { emit(getItem()) } }为仅flowOf(getItem())因为getItem()在收集流时必须延迟调用。

Flow.toMap() 标准库中当前未提供它,因此它是:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

带有协程和SavedStateHandle的liveData

来自分类Dev

调用并行协程并等待所有协程结束

来自分类Dev

使用asyncio协程并行运行功能?

来自分类Dev

带有异步的互递协程

来自分类Dev

带有翻新的CURL -u请求

来自分类Dev

诊断带有请求的挂起请求

来自分类Dev

诊断带有请求的挂起请求

来自分类Dev

如何调用科特林从Java 7挂起协程功能

来自分类Dev

如何启动并行协程并返回结果

来自分类Dev

C ++协程:从最后的挂起点调用`handle.destroy`是否有效?

来自分类Dev

具有MVVM和翻新功能的RXjava

来自分类Dev

Kotlin协程壁垒:等待所有协程完成

来自分类Dev

Kotlin 协程不会立即编译(挂起函数?)

来自分类Dev

C ++ 1z协程具有语言功能吗?

来自分类Dev

不“等待”任何东西的协程与功能是否有所不同?

来自分类Dev

对Kotlin与协程有关的暂停功能感到困惑

来自分类Dev

Android:带有协程的不稳定的ViewModel单元测试

来自分类Dev

带有水平进度条的启动画面使用协程

来自分类Dev

使用带有装饰协程的单事件循环返回将来的结果

来自分类Dev

如何在 Python 中使用带有协程的装饰器?

来自分类Dev

如何并行运行多个阻塞IO协程

来自分类Dev

如何并行运行多个阻塞IO协程

来自分类Dev

如何为并行任务实现协程

来自分类Dev

Python原生协程和send()

来自分类Dev

使用协程读取和复制文件

来自分类Dev

有可能获得协程吗?

来自分类Dev

具有Kotlin协程的FusedLocationProviderClient

来自分类Dev

旋转没有协程的物体?

来自分类常见问题

Kotlin协程中launch / join和async / await有什么区别