F#パラレルデザインパターン

tesla1060

F#コンソールプロジェクトで同じタスクを並行して実行しようとしています。

タスクは次のとおりです

  1. テーブル名をからデキューしますConcurrentQueue(このキューには、プログラムが処理する必要のあるテーブル名が含まれています)
  2. テーブルのSqlDataReaderを開きます
  3. SqlDataReaderの各行をStreamWriterに書き込みます
  4. StreamWriterによって作成されたファイルを圧縮します
  5. 1〜4を繰り返します

したがって、基本的に各タスクは、テーブルを継続的に処理するためのwhileループ(再帰としてポーズをとる)です。そして、4つのタスクを並行して開始したいと思いEnterます。また、コンソールでユーザーのキーストローク(キーなど)を使用して実行を停止したいと思います。ただし、現在のタスクでステップ4が完了した場合にのみ、実行を停止する必要があります。

私は以下を試しました

let rec DownloadHelper (cq:ConcurrentQueue<Table>) sqlConn = 
    let success, tb = cq.TryDequeue()
    if success then
        printfn "Processing %s %s" tb.DBName tb.TBName
        Table2CSV tb.DBName tb.TBName sqlConn
        DownloadHelper cq sqlConn

let DownloadTable (cq:ConcurrentQueue<Table>) connectionString=
    use con = new SqlConnection(connectionString)
    con.Open()
    DownloadHelper cq con

let asyncDownloadTask = async { return DownloadTable cq connectionString}
let asyncMultiDownload =
    asyncDownloadTask
    |> List.replicate 4
    |> Async.Parallel
asyncMultiDownload
|>Async.RunSynchronously
|>ignore

上記のコードには2つの問題があります。

  1. メインスレッドをブロックするので、キーストローク部分のやり方がわかりません
  2. 実行を正常に停止する方法がわかりません。

私の2番目の試みは、CancellationTokenを使用するために以下のとおりです。

let tokenSource = new CancellationTokenSource()

let cq = PrepareJobs connectionString
let asyncDownloadTask = async {  DownloadTable cq connectionString}
let task = async {  
            asyncDownloadTask
            |> List.replicate 4
            |> Async.Parallel
            |>ignore}
let val1 = Async.Start(task, cancellationToken =tokenSource.Token)
Console.ReadLine() |> ignore
tokenSource.Cancel()
Console.ReadLine() |> ignore
0

しかし、私はタスクをまったく開始することさえできないようです。

グルンドーン

コードには3つの問題があります。

まず、DownloadHelper1つのテーブルのみを実行する必要があります。再帰的にすることで、制御しすぎて並列処理を抑制します。

次に、async式に演算を配置するだけでは、魔法のように非同期になるわけではありません。DownloadTable関数自体が非同期でない限り、コードは終了するまでブロックされます。

したがって、4つのダウンロードを並行して実行すると、一度開始すると、キャンセルトークンに関係なく、すべてが完了するまで実行されます。

第三に、2番目の例では、使用しますAsync.Parallelが、出力を破棄します。これが、task何もしない理由です。あなたがやりたかったのは、非同期自体ではなく、非同期の結果捨てることだったと思います

これらのポイントを示すために、これが私のバージョンのコードです。

まず、時間がかかるダミー関数:

let longAtomicOperation milliSecs = 
    let sw = System.Diagnostics.Stopwatch()
    let r = System.Random()
    let mutable total = 0.0
    sw.Start()
    while sw.ElapsedMilliseconds < int64 milliSecs do
        total <- total + sin (r.NextDouble())
    // return something
    total

// test
#time
longAtomicOperation 2000
#time
// Real: 00:00:02.000, CPU: 00:00:02.000, GC gen0: 0, gen1: 0, gen2: 0

この関数は非同期ではないことに注意してください。開始すると、最後まで実行されます。

今それを置きましょうasync

let asyncTask id = async {  
    // note that NONE of the operations are async
    printfn "Started %i" id
    let result = longAtomicOperation 5000 // 5 seconds
    printfn "Finished %i" id
    return result
    }

asyncブロックの操作はどれも非同期ではないため、メリットはありません。

4つのタスクを並行して作成するコードは次のとおりです。

let fourParallelTasks = async {
    let! results = 
        List.init 4 asyncTask
        |> Async.Parallel
    // ignore
    return ()
    }

の結果はAsync.Parallel無視されませんが、タスクを強制的に実行する値に割り当てられます。async全体返す手段として表現けれども。

テストした場合:

open System.Threading

// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourParallelTasks, cancellationToken = tokenSource.Token)

// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore

キーが押された場合でも、次のような出力が得られます。これは、開始されると、各タスクが完了するまで実行されるためです。

press a key to cancel
Started 3
Started 1
Started 2
Started 0
Finished 1
Finished 3
Finished 2
Finished 0

一方、次のようにシリアルバージョンを作成する場合:

let fourSerialTasks = async {
    let! result1 = asyncTask 1
    let! result2 = asyncTask 2
    let! result3 = asyncTask 3
    let! result4 = asyncTask 4
    // ignore
    return ()
    }

次に、タスクがアトミックであっても、キャンセルトークンが各ステップ間でテストされ、サブシーケンスタスクのキャンセルが可能になります。

// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourSerialTasks, cancellationToken = tokenSource.Token)

// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore

上記のコードは、キーが押されたときに各ステップ間でキャンセルできます。

キューのすべての要素をこのように4つのバッチで処理するには、並列バージョンをループに変換するだけです。

let rec processQueueAsync() = async {
    let! result = processFourElementsAsync()
    if result <> QueueEmpty then
        do! processQueueAsync()
    // ignore
    return ()
    }

最後に、私にとって、非同期を使用することは、ノンブロッキングコードを書くことほど、物事を並行して実行することではありませんしたがって、ライブラリコードがブロックしている場合、非同期アプローチはあまりメリットを提供しません。

コードが非ブロッキングであることを確認するにSqlDataReaderは、ヘルパーでNextResultAsyncなどの非同期バージョンのメソッドを使用する必要があります

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

F#コンパイルエラー

分類Dev

レルム永続性のデザインパターン

分類Dev

[F#]; [コンパイルエラー]; F#プログラムをコンパイルできません

分類Dev

デコレータパターンデザイン

分類Dev

デコレータパターンデザイン

分類Dev

C#インターフェイスの実装時のF#コンパイルエラー

分類Dev

F#はバインディングパターンを許可します

分類Dev

コントローラーのデザインパターン

分類Dev

デザインパターン:C ++抽象化レイヤー

分類Dev

PHP MVC:データマッパーパターン:クラスデザイン

分類Dev

C ++シングルトンデザインパターン

分類Dev

PHPのシングルトンデザインパターン

分類Dev

デザインパターン:シングルトンの混乱

分類Dev

シングルトンデザインパターン

分類Dev

デコレータデザインパターンJavaオーバーライドメソッドの質問

分類Dev

Javaデザインパターン:パイプラインと不変性

分類Dev

F#からJavaScriptへのコンパイラ用のF#プロジェクトテンプレート

分類Dev

Pythonデザインパターン

分類Dev

DAOデザインパターン

分類Dev

MapMakerデザインパターン?

分類Dev

XSDデザインパターン

分類Dev

Reduxとデザインパターン

分類Dev

Smalltalkのデザインパターン

分類Dev

Reactデザインパターン

分類Dev

戦略デザインパターン

分類Dev

デザインパターンの例

分類Dev

複合デザインパターン

分類Dev

AbstractFactoryデザインパターン

分類Dev

REACTREDUX-デザインパターン

Related 関連記事

ホットタグ

アーカイブ