몇 가지 의심을 설명하기 위해 다음 응용 프로그램을 만들었습니다. Github의 내 예
이 예에서는 파일을 다른 패키지에 복사합니다.
내 의심은 다음과 같습니다.
작업을 병렬로 수행하면 취소 전에 완료된 값을 반환 할 수 있습니까?
contentResolver.openInputStream (uri)
IO 컨텍스트로 작업하는 동안 "Inappropriate blocking method call"메시지가 나타나는 이유는 무엇 입니까?
출력에 복사 할 파일 항목을 읽는 동안 항상 작업 상태를 확인하여이 작업이 취소되면 즉시 중지되고 생성 된 출력 파일이 삭제되고 취소 예외를 반환합니다. 맞습니까?
수행되는 작업의 양을 구분할 수 있습니까?
내 onCreate :
private val listUri = mutableListOf<Uri>()
private val job = Job()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//get files from 1 to 40
val packageName = "android.resource://${packageName}/raw/"
for (i in 1..40) {
listUri.add(Uri.parse("${packageName}file$i"))
}
}
내 버튼 동작 :
//Button action
fun onClickStartTask(view: View) {
var listNewPath = emptyList<String>()
CoroutineScope(Main + job).launch {
try {
//shows something in the UI - progressBar
withContext(IO) {
listNewPath = listUri.map { uri ->
async {
//path to file temp
val pathFileTemp =
"${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
val file = File(pathFileTemp)
val inputStream = contentResolver.openInputStream(uri)
inputStream?.use { input ->
FileOutputStream(file).use { output ->
val buffer = ByteArray(1024)
var read: Int = input.read(buffer)
while (read != -1) {
if (isActive) {
output.write(buffer, 0, read)
read = input.read(buffer)
} else {
input.close()
output.close()
file.deleteRecursively()
throw CancellationException()
}
}
}
}
//If completed then it returns the new path.
return@async pathFileTemp
}
}.awaitAll()
}
} finally {
//shows list complete in the UI
}
}
}
작업 취소 버튼 :
fun onClickCancelTask(view: View) {
if (job.isActive) {
job.cancelChildren()
println("Cancel children")
}
}
이것은 작업을 수행하는 버튼 동작입니다.
모든 도움에 감사드립니다.
답변 1. 및 4 .:
병렬 작업을 구분하고 독립적으로 완료되도록하려면 (일부 값을 가져오고 나머지는 취소) 채널을 사용해야하며 가급적 흐름을 사용해야합니다. 단순화 된 예 :
fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
val workToDistribute = Channel<Whatever>()
launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...
repeat(concurrency) { // launch a specified number of worker coroutines
launch {
for (task in workToDistribute) { // which process tasks in a loop
val atomicResult = process(task)
send(atomicResult) // and send results downstream to a Flow
}
}
}
}
그런 다음 전체 흐름이 완료 될 때까지 기다리는 결과가 생성 될 때 하나씩 처리 할 수 있습니다. 예를 들어 원하는 경우 일부만 가져 resultFlow.take(20).onEach { ... }.collectIn(someScope)
옵니다. 흐름이기 때문에 누군가 수집을 시작할 때만 작동하기 시작합니다 ( 그것은 일반적으로 좋은 것입니다.
좀 더 구체적이고 실험적인 기능 (프로덕션)을 발견 할 수 있으므로 전체를 조금 더 짧게 만들 수 있습니다. 다음과 같이 Flow 연산자로 일반화 할 수 있습니다.
fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
require(concurrency > 1) { "No sense with concurrency < 2" }
return channelFlow {
val inputChannel = produceIn(this)
repeat(concurrency) {
launch {
for (input in inputChannel) send(transform(input))
}
}
}
}
사용 : list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }
corotuines 팀은 병렬 연산자 제품군을 Flow 스트림에 추가하려고 생각하고 있지만 아직 AFAIK가 없습니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다