かなり単純なタスクを実装したいと思います。2つのキューがあります(どちらも容量が制限されています):BlockingQueue<String> source
およびBlockingQueue<String> destination
。スレッドには2つのタイプがありProducer producer
ますBlockingQueue<String> source
。メッセージを生成し、に格納します。2番目-Replacer replacer
ソースから選択し、メッセージを変換して、に挿入しますBlockingQueue<String> destination
。
2つの質問/問題:
次の要件を正しく実装したかどうかはわかりません。送信元が空でなく、宛先がいっぱいでない場合は、送信元から宛先にメッセージを転送します。
私のプログラムを終了した後、「SignalDispatcher」と呼ばれるまだ実行中のスレッドがあります。どうすれば適切に終了できますか?プログラムが正しく終了しません。
相対エンティティの実装は次のとおりです。
送信元/宛先キューの実装。
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber++;
storage.add(element);
notifyAll();
}
@Override
public synchronized E poll() {
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber--;
notifyAll();
return storage.poll();
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
プロデューサーの実装
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
消費者の実装
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
そしてヘルパークラス
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
メインクラス:
public class Main {
public static void main(String[] args) {
BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
// Create, configure and start PRODUCER threads.
Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);
// Create, configure and start REPLACER threads.
Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);
// Read NUMBER_OF_MESSAGES_TO_READ from destination.
for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
System.out.println(destination.poll());
}
}
}
これが作業コードです。
/**
* Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
* This class provides thread-safe operations
* {@code public void offer(E element)} and {@code public E poll()}
*/
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
storage.add(element);
currentNumber++;
notifyAll();
}
@Override
public synchronized E poll() {
E polledElement;
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
polledElement = storage.poll();
currentNumber--;
return polledElement;
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber >= capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
//Continue execution of a thread if a destination is not full and source is not empty.
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加