スレッドは条件で終了しません、生産者/消費者スレッド

mr.M

かなり単純なタスクを実装したいと思います。2つのキューがあります(どちらも容量が制限されています):BlockingQueue<String> sourceおよびBlockingQueue<String> destinationスレッドには2つのタイプがありProducer producerますBlockingQueue<String> sourceメッセージを生成し、に格納します2番目-Replacer replacerソースから選択し、メッセージを変換して、に挿入しますBlockingQueue<String> destination

2つの質問/問題:

  1. 次の要件を正しく実装したかどうかはわかりません。送信元が空でなく、宛先がいっぱいでない場合は、送信元から宛先にメッセージを転送します。

  2. 私のプログラムを終了した後、「SignalDispatcher」と呼ばれるまだ実行中のスレッドがあります。どうすれば適切に終了できますか?プログラムが正しく終了しません。

相対エンティティの実装は次のとおりです。

送信元/宛先キューの実装。

public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;

public BlockingQueueImpl(int capacity) {
    this.capacity = capacity;
    this.storage = new PriorityQueue<E>(capacity);
}

@Override
public synchronized void offer(E element) {
    while (isFull()) {
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    currentNumber++;
    storage.add(element);
    notifyAll();
}

@Override
public synchronized E poll() {
    while (isEmpty()) {
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    currentNumber--;
    notifyAll();
    return storage.poll();
}

@Override
public int size() {
    return capacity;
}
public synchronized boolean isFull(){
    return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
    return currentNumber == 0;
}
}

プロデューサーの実装

public class Producer implements Runnable {
    BlockingQueue<String> source;
    String threadName;

    public Producer(BlockingQueue<String> source, String threadName) {
        this.source = source;
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (!source.isFull()) {
            source.offer(Utilities.generateMessage(threadName));
        }
    }
}

消費者の実装

public class Replacer implements Runnable {
    BlockingQueue<String> source;
    BlockingQueue<String> destination;
    String threadName;

    public Replacer(BlockingQueue<String> source,
                    BlockingQueue<String> destination,
                    String threadName) {

        this.source = source;
        this.destination = destination;
        this.threadName = threadName;
    }

    public synchronized void replace() {
        destination.offer(Utilities.transformMessage(threadName, source.poll()));
    }
private boolean isRunning() {
        return (!destination.isFull()) && (!source.isEmpty());
    }

@Override
public void run() {
    while (isRunning()) {
        replace();
    }
}


}

そしてヘルパークラス

    public class Utilities {

        public static final int NUMBER_OF_PRODUCER_THREADS = 3;
        public static final int NUMBER_OF_REPLACER_THREADS = 1000;
        public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
        public static final int STORAGE_CAPACITY = 100;

        public static String transformMessage(String threadName, String messageToTransform) {
            String[] splittedString = messageToTransform.split(" ");
            String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
            return newMessage;
        }

        public static String generateMessage(String threadName) {
            return "Thread #" + threadName + " generated message #" + threadName;
        }

        public static void spawnDaemonThreads(String threadName,
                                              int numberOfThreadsToSpawn,
                                              BlockingQueue<String> source,
                                              BlockingQueue<String> destination) {

            if (destination == null) {
                for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                    String name = threadName + i;
                    Producer producer = new Producer(source, name);

                    Thread threadProducer = new Thread(producer);
                    threadProducer.setName(name);
                    threadProducer.setDaemon(true);
                    threadProducer.start();
                }
            } else {
                for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                    String name = threadName + i;
                    Replacer replacer = new Replacer(source, destination, name);

                    Thread threadProducer = new Thread(replacer);
                    threadProducer.setName(name);
                    threadProducer.setDaemon(true);
                    threadProducer.start();
                }
            }


}
}

メインクラス:

public class Main {
    public static void main(String[] args) {

        BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
        BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);

        // Create, configure and start PRODUCER threads.
        Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);

        // Create, configure and start REPLACER threads.
        Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);

        // Read NUMBER_OF_MESSAGES_TO_READ from destination.
       for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
        System.out.println(destination.poll());
    }
    }
}
mr.M

これが作業コードです。

     /**
     * Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
     * This class provides thread-safe operations
     * {@code public void offer(E element)} and {@code public E poll()}
     */

public class BlockingQueueImpl<E> implements BlockingQueue<E> {
    private volatile Queue<E> storage = new PriorityQueue<>();
    private volatile int capacity;
    private volatile int currentNumber;

    public BlockingQueueImpl(int capacity) {
        this.capacity = capacity;
        this.storage = new PriorityQueue<E>(capacity);
    }

    @Override
    public synchronized void offer(E element) {
        while (isFull()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        storage.add(element);
        currentNumber++;
        notifyAll();
    }

    @Override
    public synchronized E poll() {
        E polledElement;
        while (isEmpty()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        notifyAll();
        polledElement = storage.poll();
        currentNumber--;
        return polledElement;
    }

    @Override
    public int size() {
        return capacity;
    }
    public synchronized boolean isFull(){
        return currentNumber >= capacity;
    }
    public synchronized boolean isEmpty(){
        return currentNumber == 0;
    }
}

public class Producer implements Runnable {
    BlockingQueue<String> source;
    String threadName;

    public Producer(BlockingQueue<String> source, String threadName) {
        this.source = source;
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (!source.isFull()) {
            source.offer(Utilities.generateMessage(threadName));
        }
    }
}

public class Replacer implements Runnable {
    BlockingQueue<String> source;
    BlockingQueue<String> destination;
    String threadName;

    public Replacer(BlockingQueue<String> source,
                    BlockingQueue<String> destination,
                    String threadName) {

        this.source = source;
        this.destination = destination;
        this.threadName = threadName;
    }

    public synchronized void replace() {
        destination.offer(Utilities.transformMessage(threadName, source.poll()));
    }

    //Continue execution of a thread if a destination is not full and source is not empty.
    private boolean isRunning() {
        return (!destination.isFull()) && (!source.isEmpty());
    }

    @Override
    public void run() {
        while (isRunning()) {
            replace();
        }
    }
}
public class Utilities {

    public static final int NUMBER_OF_PRODUCER_THREADS = 3;
    public static final int NUMBER_OF_REPLACER_THREADS = 1000;
    public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
    public static final int STORAGE_CAPACITY = 100;

    public static String transformMessage(String threadName, String messageToTransform) {
        String[] splittedString = messageToTransform.split(" ");
        String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
        return newMessage;
    }

    public static String generateMessage(String threadName) {
        return "Thread #" + threadName + " generated message #" + threadName;
    }

    public static void spawnDaemonThreads(String threadName,
                                          int numberOfThreadsToSpawn,
                                          BlockingQueue<String> source,
                                          BlockingQueue<String> destination) {

        if (destination == null) {
            for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                String name = threadName + i;
                Producer producer = new Producer(source, name);

                Thread threadProducer = new Thread(producer);
                threadProducer.setName(name);
                threadProducer.setDaemon(true);
                threadProducer.start();
            }
        } else {
            for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                String name = threadName + i;
                Replacer replacer = new Replacer(source, destination, name);

                Thread threadProducer = new Thread(replacer);
                threadProducer.setName(name);
                threadProducer.setDaemon(true);
                threadProducer.start();
            }
        }
    }
}

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

スレッドとEventWaitHandleを使用した生産者/消費者パターン

分類Dev

生産者/消費者の例では2番目のスレッドが開始されない

分類Dev

生産者/消費者問題でスレッドをブロックするミューテックス

分類Dev

Java(同期)を使用する生産者/消費者モデルですが、常に同じスレッドを実行します

分類Dev

セマフォに関する3つのスレッドと消費者/生産者問題

分類Dev

生産者/消費者プログラムのスレッド優先度-wait()&notify()

分類Dev

JNIがコンシューマスレッドで使用されている生産者/消費者プログラムでSIGINTシグナルをキャッチできません

分類Dev

マルチスレッドの生産者/消費者による実行モードとdeburgモードでの異なる結果

分類Dev

Ruby-反復時にマルチスレッド(生産者/消費者モデル)を使用した場合の予期しない結果

分類Dev

こんにちは、spring-kafkaはどのように消費者スレッドを処理しますか?

分類Dev

生産者/消費者シナリオにおける多くのプロデューサーとのLinkedBlockingQueueのスレッドセーフ

分類Dev

キューに残されているアイテムの代替(スレッド化された消費者-生産者)

分類Dev

生産者/消費者消費者のJava実装はjava.lang.IllegalMonitorStateExceptionをスローします

分類Dev

C#-生産者/消費者シナリオでプライベートメソッドを単体テストする方法は?

分類Dev

Kafkaリスナーメソッドは呼び出されません。消費者は消費していません。

分類Dev

消費者-生産者。エラーはありません。時々動作します。どうして?

分類Dev

生産者/消費者:ロストウェイクアップの問題

分類Dev

この生産者/消費者コードでデッドロックが発生するのはいつですか

分類Dev

生産者/消費者、1つのセマフォで実装してみませんか?

分類Dev

なぜ消費者は生産者のパフォーマンスを低下させるのですか

分類Dev

消費者/プロデューサーのロックGUIスレッド

分類Dev

非同期I / Oはスレッドを消費しますか?

分類Dev

Ubuntu kworkerスレッドは100%のCPUを消費します

分類Dev

消費者スレッドが例外に直面している場合はプロデューサーのスレッドを停止する方法

分類Dev

スレッド間通信プロデューサー消費者問題

分類Dev

Laravel 8:送信者アドレスなしでメッセージを送信することはできません

分類Dev

スレッドが終了していません。ループで立ち往生

分類Dev

消費者スレッドは、誤って起動した場合に condition_variable 通知信号を受信しますか

分類Dev

Kafka消費者ダイヤモンドオペレーターは引数を推測できません

Related 関連記事

  1. 1

    スレッドとEventWaitHandleを使用した生産者/消費者パターン

  2. 2

    生産者/消費者の例では2番目のスレッドが開始されない

  3. 3

    生産者/消費者問題でスレッドをブロックするミューテックス

  4. 4

    Java(同期)を使用する生産者/消費者モデルですが、常に同じスレッドを実行します

  5. 5

    セマフォに関する3つのスレッドと消費者/生産者問題

  6. 6

    生産者/消費者プログラムのスレッド優先度-wait()&notify()

  7. 7

    JNIがコンシューマスレッドで使用されている生産者/消費者プログラムでSIGINTシグナルをキャッチできません

  8. 8

    マルチスレッドの生産者/消費者による実行モードとdeburgモードでの異なる結果

  9. 9

    Ruby-反復時にマルチスレッド(生産者/消費者モデル)を使用した場合の予期しない結果

  10. 10

    こんにちは、spring-kafkaはどのように消費者スレッドを処理しますか?

  11. 11

    生産者/消費者シナリオにおける多くのプロデューサーとのLinkedBlockingQueueのスレッドセーフ

  12. 12

    キューに残されているアイテムの代替(スレッド化された消費者-生産者)

  13. 13

    生産者/消費者消費者のJava実装はjava.lang.IllegalMonitorStateExceptionをスローします

  14. 14

    C#-生産者/消費者シナリオでプライベートメソッドを単体テストする方法は?

  15. 15

    Kafkaリスナーメソッドは呼び出されません。消費者は消費していません。

  16. 16

    消費者-生産者。エラーはありません。時々動作します。どうして?

  17. 17

    生産者/消費者:ロストウェイクアップの問題

  18. 18

    この生産者/消費者コードでデッドロックが発生するのはいつですか

  19. 19

    生産者/消費者、1つのセマフォで実装してみませんか?

  20. 20

    なぜ消費者は生産者のパフォーマンスを低下させるのですか

  21. 21

    消費者/プロデューサーのロックGUIスレッド

  22. 22

    非同期I / Oはスレッドを消費しますか?

  23. 23

    Ubuntu kworkerスレッドは100%のCPUを消費します

  24. 24

    消費者スレッドが例外に直面している場合はプロデューサーのスレッドを停止する方法

  25. 25

    スレッド間通信プロデューサー消費者問題

  26. 26

    Laravel 8:送信者アドレスなしでメッセージを送信することはできません

  27. 27

    スレッドが終了していません。ループで立ち往生

  28. 28

    消費者スレッドは、誤って起動した場合に condition_variable 通知信号を受信しますか

  29. 29

    Kafka消費者ダイヤモンドオペレーターは引数を推測できません

ホットタグ

アーカイブ