F#コンソールプロジェクトで同じタスクを並行して実行しようとしています。
タスクは次のとおりです
ConcurrentQueue
(このキューには、プログラムが処理する必要のあるテーブル名が含まれています)したがって、基本的に各タスクは、テーブルを継続的に処理するためのwhileループ(再帰としてポーズをとる)です。そして、4つのタスクを並行して開始したいと思いEnter
ます。また、コンソールでユーザーのキーストローク(キーなど)を使用して実行を停止したいと思います。ただし、現在のタスクでステップ4が完了した場合にのみ、実行を停止する必要があります。
私は以下を試しました
let rec DownloadHelper (cq:ConcurrentQueue<Table>) sqlConn =
let success, tb = cq.TryDequeue()
if success then
printfn "Processing %s %s" tb.DBName tb.TBName
Table2CSV tb.DBName tb.TBName sqlConn
DownloadHelper cq sqlConn
let DownloadTable (cq:ConcurrentQueue<Table>) connectionString=
use con = new SqlConnection(connectionString)
con.Open()
DownloadHelper cq con
let asyncDownloadTask = async { return DownloadTable cq connectionString}
let asyncMultiDownload =
asyncDownloadTask
|> List.replicate 4
|> Async.Parallel
asyncMultiDownload
|>Async.RunSynchronously
|>ignore
上記のコードには2つの問題があります。
私の2番目の試みは、CancellationTokenを使用するために以下のとおりです。
let tokenSource = new CancellationTokenSource()
let cq = PrepareJobs connectionString
let asyncDownloadTask = async { DownloadTable cq connectionString}
let task = async {
asyncDownloadTask
|> List.replicate 4
|> Async.Parallel
|>ignore}
let val1 = Async.Start(task, cancellationToken =tokenSource.Token)
Console.ReadLine() |> ignore
tokenSource.Cancel()
Console.ReadLine() |> ignore
0
しかし、私はタスクをまったく開始することさえできないようです。
コードには3つの問題があります。
まず、DownloadHelper
は1つのテーブルのみを実行する必要があります。再帰的にすることで、制御しすぎて並列処理を抑制します。
次に、async
式に演算を配置するだけでは、魔法のように非同期になるわけではありません。DownloadTable
関数自体が非同期でない限り、コードは終了するまでブロックされます。
したがって、4つのダウンロードを並行して実行すると、一度開始すると、キャンセルトークンに関係なく、すべてが完了するまで実行されます。
第三に、2番目の例では、使用しますAsync.Parallel
が、出力を破棄します。これが、task
何もしない理由です。あなたがやりたかったのは、非同期自体ではなく、非同期の結果を捨てることだったと思います。
これらのポイントを示すために、これが私のバージョンのコードです。
まず、時間がかかるダミー関数:
let longAtomicOperation milliSecs =
let sw = System.Diagnostics.Stopwatch()
let r = System.Random()
let mutable total = 0.0
sw.Start()
while sw.ElapsedMilliseconds < int64 milliSecs do
total <- total + sin (r.NextDouble())
// return something
total
// test
#time
longAtomicOperation 2000
#time
// Real: 00:00:02.000, CPU: 00:00:02.000, GC gen0: 0, gen1: 0, gen2: 0
この関数は非同期ではないことに注意してください。開始すると、最後まで実行されます。
今それを置きましょうasync
:
let asyncTask id = async {
// note that NONE of the operations are async
printfn "Started %i" id
let result = longAtomicOperation 5000 // 5 seconds
printfn "Finished %i" id
return result
}
asyncブロックの操作はどれも非同期ではないため、メリットはありません。
4つのタスクを並行して作成するコードは次のとおりです。
let fourParallelTasks = async {
let! results =
List.init 4 asyncTask
|> Async.Parallel
// ignore
return ()
}
の結果はAsync.Parallel
無視されませんが、タスクを強制的に実行する値に割り当てられます。async
全体返す手段として表現けれども。
テストした場合:
open System.Threading
// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourParallelTasks, cancellationToken = tokenSource.Token)
// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore
キーが押された場合でも、次のような出力が得られます。これは、開始されると、各タスクが完了するまで実行されるためです。
press a key to cancel
Started 3
Started 1
Started 2
Started 0
Finished 1
Finished 3
Finished 2
Finished 0
一方、次のようにシリアルバージョンを作成する場合:
let fourSerialTasks = async {
let! result1 = asyncTask 1
let! result2 = asyncTask 2
let! result3 = asyncTask 3
let! result4 = asyncTask 4
// ignore
return ()
}
次に、タスクがアトミックであっても、キャンセルトークンが各ステップ間でテストされ、サブシーケンスタスクのキャンセルが可能になります。
// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourSerialTasks, cancellationToken = tokenSource.Token)
// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore
上記のコードは、キーが押されたときに各ステップ間でキャンセルできます。
キューのすべての要素をこのように4つのバッチで処理するには、並列バージョンをループに変換するだけです。
let rec processQueueAsync() = async {
let! result = processFourElementsAsync()
if result <> QueueEmpty then
do! processQueueAsync()
// ignore
return ()
}
最後に、私にとって、非同期を使用することは、ノンブロッキングコードを書くことほど、物事を並行して実行することではありません。したがって、ライブラリコードがブロックしている場合、非同期アプローチはあまりメリットを提供しません。
コードが非ブロッキングであることを確認するにSqlDataReader
は、ヘルパーでNextResultAsync
。などの非同期バージョンのメソッドを使用する必要があります。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加