mutexが正しい同期を示さないゴルーチン間で変更されたスライス

カイル:

私は行くのは初めてですが、以前は並行性で働いていました。すべてのゴルーチン間で同じデータを含まない複数のゴルーチン間でスライスを共有する問題が発生しています。mutexも使用して、スライスを変更するときに構造体をロックしますが、効果がないようです。コードを添付しましたが、何が問題なのか知りたいのですが、助けてくれてありがとう!

type State struct {
    waiting int32
    processing int32
    completed int32
}

type Scheduler struct {
    sync.Mutex
    items chan interface{}
    backPressure []interface{}
    capacity int
    canceler context.CancelFunc
    state State
}

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return scheduler
}

func (s *Scheduler) initializeWorkers(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    for i := 0; i < 5; i++ {
        go s.newWorker(ctx, handler)
    }
}

func (s *Scheduler) newWorker(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    backoff := 0

    for {
        select {
        case <-ctx.Done():
            return
        case job := <- s.items:
            atomic.AddInt32(&s.state.waiting, -1)
            atomic.AddInt32(&s.state.processing, 1)
            job, _ = handler(job)
            backoff = 0
            atomic.AddInt32(&s.state.processing, -1)
            atomic.AddInt32(&s.state.completed, 1)
        default:
            backoff += 1
            s.CheckBackPressure()
            time.Sleep(time.Duration(backoff * 10) * time.Millisecond)
        }
    }
}

func (s *Scheduler) AddItem(item interface{}) {
    atomic.AddInt32(&s.state.waiting, 1)

    if len(s.items) < s.capacity {
        select {
        case s.items <- item:
            return
        }
    }

    s.Lock()
    defer s.Unlock()

    s.backPressure = append(s.backPressure, item)

    fmt.Printf("new backpressure len %v \n", len(s.backPressure))

    return
}

func (s *Scheduler) Process() {
    var wg sync.WaitGroup

    wg.Add(1)


    go func() {
        defer wg.Done()

        for {
            if atomic.LoadInt32(&s.state.waiting) == 0 && atomic.LoadInt32(&s.state.processing) == 0 {
                return
            }
            runtime.Gosched()
        }
    }()

    wg.Wait()
}

func (s *Scheduler) CheckBackPressure() {
    s.Lock()
    defer s.Unlock()

    if len(s.backPressure) == 0 || s.capacity <= len(s.items) {
        fmt.Printf("backpressure = %d  :: len = %d cap = %d \n", len(s.backPressure), len(s.items), s.capacity)
        return
    }

    fmt.Printf("releasing backpressure \n")

    job, tmp := s.backPressure[0], s.backPressure[1:]

    s.backPressure = tmp

    s.items <- job
    return
}

func (s *Scheduler) Stop() {
    s.canceler()
}

これは、機能をテストするために使用しているコードです。

type Job struct {
    Value int
}

func TestSchedulerExceedingCapacity(t *testing.T) {


    handler := func (ptr interface{}) (interface{}, error) {
        job, ok := (ptr).(*Job)

        if ok != true {
            return nil, errors.New("failed to convert job")
        }

        // simulate work
        time.Sleep(50 * time.Millisecond)

        return job, nil
    }

    scheduler := NewScheduler(5, handler)

    for i := 0; i < 25; i++ {
        scheduler.AddItem(&(Job { Value: i }))
    }

    fmt.Printf("PROCESSING\n")
    scheduler.Process()
    fmt.Printf("FINISHED\n")
}

バックプレッシャーを保持するスライスを更新すると、new backpressure len 11〜16の印刷正しく更新されたことが示されているようです。

しかし、ワーカーからのバックプレッシャーを確認すると、バックプレッシャースライスが空であることがわかります。backpressure = 0 :: len = 0 cap = 5

また、「バックプレッシャーの解放」もstdoutに出力されません。

ここにいくつかの追加出力があります...

=== RUN   TestSchedulerExceedingCapacity
new backpressure len 1 
new backpressure len 2 
new backpressure len 3 
new backpressure len 4 
new backpressure len 5 
new backpressure len 6 
new backpressure len 7 
new backpressure len 8 
backpressure = 0  :: len = 0 cap = 5 
new backpressure len 9 
new backpressure len 10 
new backpressure len 11 
new backpressure len 12 
new backpressure len 13 
new backpressure len 14 
new backpressure len 15 
new backpressure len 16 
PROCESSING
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
...

テストを終了しないと、無期限に出力されます backpressure = 0 :: len = 0 cap = 5

変更を正しく同期していないと思いますが、本当に洞察をいただければ幸いです。

カイル:

さて、質問を投稿したら、もちろんこれを理解することができました...

データレース検出機能-raceを有効にするオプションを使用してテストを実行するよう提案された場所を見つけましたすぐにエラーが発生し、問題のデバッグが容易になりました。

問題はNewScheduler、新しいスケジューラのポインタでなく、値を返すことに関連していたことがわかりましたその関数を次のコードに変更して、問題を修正しました。

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)
    atomic.StoreInt32(&state.errors, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return &scheduler
}

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

メインプログラムでインデックスの名前が変更されたため、サブルーチンが想定された形状配列に正しい数値を返さない

分類Dev

ReactJSで状態が変更された後、インラインスタイルが正しくレンダリングされない

分類Dev

ディレクティブ内の非同期サービスのハンドラーでスコープを変更した後、DOMが更新されない

分類Dev

関数内で変数を変更した後、変数が変更されないのはなぜですか?-非同期コードリファレンス

分類Dev

バインドされたスコープが正しく変更されない

分類Dev

ウィンドウの場所が変更されたときにreactルータースイッチが機能しないのはなぜですか?

分類Dev

D3:スライダーを変更してもサークルポイントが更新されない

分類Dev

ファイルがマージされていないため、ブランチを変更したりコミットしたりできません。間違ったブランチにいるため、ファイルをマージまたは隠蔽できません

分類Dev

ローカルブランチとリモートブランチの間で変更されたファイルを(リスト)参照してください-git

分類Dev

コミットされていない変更として、ブランチからローカルブランチのマスターにプッシュされた変更を取得するにはどうすればよいですか?

分類Dev

アクションバーのスタイルを変更した後、R.Javaが生成されない

分類Dev

スプライスで配列要素を削除した後、GoogleChromeコンソールに正しいデータが表示されない

分類Dev

変更されていないときにチームエクスプローラーで変更済みとマークされたcsprojファイル(git)

分類Dev

フィルタリングされたリサイクラービューがスワイプで正しいカード位置を取得しない

分類Dev

マルチスレッド-ポインタ上で正しく機能しているのに、参照を操作しても変更されない理由

分類Dev

スクリプトで作成されたフォルダーのタイムスタンプが正しく変更されていません

分類Dev

リンクされたファイルが変更された後、systemdサービスが更新されない

分類Dev

KDE Plasma 5マルチディスプレイ、ランチャーが正しく表示されない

分類Dev

Android:Samsungデバイスでスイッチコントロールが正しく表示されない

分類Dev

イベントリスナーがスコープ変数を変更した後、DOMでスコープ変数が更新されない

分類Dev

libxml2が正しく検出されないため、Railsをインストールできません

分類Dev

MongoDBをインストールしましたが、プログラムファイルに表示されないか、プログラムのアンインストールまたは変更に表示されません

分類Dev

QtQuickがVisualC ++で正しくインストールされていない

分類Dev

値が変更されたときにスライダーがジャンプしないようにする

分類Dev

URLが変更されたときにreactrouterdomがクラスコンポーネントを更新しない

分類Dev

Mavenチェックスタイルがゴールで実行されない

分類Dev

kvm-okがインストールされていないため、jujuでOpenStackをインストール中にエラーが発生しました

分類Dev

形状を変更しようとすると、「TypeError:リストインデックスはタプルではなく整数またはスライスである必要があります」というエラーが表示されます。

分類Dev

別のスレッドでフラグが変更されたときにループが終了しない

Related 関連記事

  1. 1

    メインプログラムでインデックスの名前が変更されたため、サブルーチンが想定された形状配列に正しい数値を返さない

  2. 2

    ReactJSで状態が変更された後、インラインスタイルが正しくレンダリングされない

  3. 3

    ディレクティブ内の非同期サービスのハンドラーでスコープを変更した後、DOMが更新されない

  4. 4

    関数内で変数を変更した後、変数が変更されないのはなぜですか?-非同期コードリファレンス

  5. 5

    バインドされたスコープが正しく変更されない

  6. 6

    ウィンドウの場所が変更されたときにreactルータースイッチが機能しないのはなぜですか?

  7. 7

    D3:スライダーを変更してもサークルポイントが更新されない

  8. 8

    ファイルがマージされていないため、ブランチを変更したりコミットしたりできません。間違ったブランチにいるため、ファイルをマージまたは隠蔽できません

  9. 9

    ローカルブランチとリモートブランチの間で変更されたファイルを(リスト)参照してください-git

  10. 10

    コミットされていない変更として、ブランチからローカルブランチのマスターにプッシュされた変更を取得するにはどうすればよいですか?

  11. 11

    アクションバーのスタイルを変更した後、R.Javaが生成されない

  12. 12

    スプライスで配列要素を削除した後、GoogleChromeコンソールに正しいデータが表示されない

  13. 13

    変更されていないときにチームエクスプローラーで変更済みとマークされたcsprojファイル(git)

  14. 14

    フィルタリングされたリサイクラービューがスワイプで正しいカード位置を取得しない

  15. 15

    マルチスレッド-ポインタ上で正しく機能しているのに、参照を操作しても変更されない理由

  16. 16

    スクリプトで作成されたフォルダーのタイムスタンプが正しく変更されていません

  17. 17

    リンクされたファイルが変更された後、systemdサービスが更新されない

  18. 18

    KDE Plasma 5マルチディスプレイ、ランチャーが正しく表示されない

  19. 19

    Android:Samsungデバイスでスイッチコントロールが正しく表示されない

  20. 20

    イベントリスナーがスコープ変数を変更した後、DOMでスコープ変数が更新されない

  21. 21

    libxml2が正しく検出されないため、Railsをインストールできません

  22. 22

    MongoDBをインストールしましたが、プログラムファイルに表示されないか、プログラムのアンインストールまたは変更に表示されません

  23. 23

    QtQuickがVisualC ++で正しくインストールされていない

  24. 24

    値が変更されたときにスライダーがジャンプしないようにする

  25. 25

    URLが変更されたときにreactrouterdomがクラスコンポーネントを更新しない

  26. 26

    Mavenチェックスタイルがゴールで実行されない

  27. 27

    kvm-okがインストールされていないため、jujuでOpenStackをインストール中にエラーが発生しました

  28. 28

    形状を変更しようとすると、「TypeError:リストインデックスはタプルではなく整数またはスライスである必要があります」というエラーが表示されます。

  29. 29

    別のスレッドでフラグが変更されたときにループが終了しない

ホットタグ

アーカイブ