我正在尝试使用信号量来解决生产者-消费者问题。该程序对我来说很好,除了一个地方。
public class ProducerConsumerWithSemaphores
{
private final ArrayList<Integer> list = new ArrayList<>(5);
private final Semaphore semaphoreProducer = new Semaphore(1);
private final Semaphore semaphoreConsumer = new Semaphore(0);
private void produce() throws InterruptedException
{
for(int i = 0;i< 5;i++)
{
semaphoreProducer.acquire();
list.add(i);
System.out.println("Produced: " + i);
semaphoreConsumer.release();
}
}
private void consumer() throws InterruptedException
{
while (!list.isEmpty()) /// This line is where I have the doubt
{
semaphoreConsumer.acquire();
System.out.println("Consumer: " + list.remove(list.size()-1));
semaphoreProducer.release();
Thread.sleep(100);
}
}
public static void main(String[] args)
{
final ProducerConsumerWithSemaphores obj = new ProducerConsumerWithSemaphores();
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
obj.produce();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
obj.consumer();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}).start();
}
}
可以在获取信号量之前检查列表是否不为空吗?这会在多线程环境中引起任何问题吗?
private void consumer() throws InterruptedException
{
while (!list.isEmpty()) /// This line is where I have the doubt
问题是,如果消费者运行的速度比生产者快,那么您的消费者立即退出,那么您就没有消费者了!!
正确的示例看起来像是生产者-消费者问题#使用信号量。我相信您的意图不是true
用作无限循环,因为您希望生产者/消费者在完成工作后退出。如果您打算这样做,则可以设置1.totalCount
结束循环。 2.或在生产者放置最后一个 (更新:如果有多个生产者/消费者,此方法将不起作用)3.模拟EOF(从生产者那里消费的想法-消费;消费者如何停止?)boolean
标记后由生产者设置的 标志 putItemIntoBuffer
。该标志以及都必须受到保护 buffer
。
这会在多线程环境中引起任何问题吗?
您的关键部分(您的list
)不受保护。通常我们使用3个信号量。第三个用作互斥体以保护缓冲区。
要停止生产者/消费者,请
使用方法1的示例代码:
public class Test3 {
private Semaphore mutex = new Semaphore(1);
private Semaphore fillCount = new Semaphore(0);
private Semaphore emptyCount = new Semaphore(3);
private final List<Integer> list = new ArrayList<>();
class Producer implements Runnable {
private final int totalTasks;
Producer(int totalTasks) {
this.totalTasks = totalTasks;
}
@Override
public void run() {
try {
for (int i = 0; i < totalTasks; i++) {
emptyCount.acquire();
mutex.acquire();
list.add(i);
System.out.println("Produced: " + i);
mutex.release();
fillCount.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private final int totalTasks;
Consumer(int totalTasks) {
this.totalTasks = totalTasks;
}
@Override
public void run() {
try {
for (int i = 0; i < totalTasks; i++) {
fillCount.acquire();
mutex.acquire();
int item = list.remove(list.size() - 1);
System.out.println("Consumed: " + item);
mutex.release();
emptyCount.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void runTest() {
int numProducer = 3;
int tasksPerProducer = 10;
int numConsumer = 6;
int tasksPerConsumer = 5;
for (int i = 0; i < numProducer; i++) {
new Thread(new Producer(tasksPerProducer)).start();
}
for (int i = 0; i < numConsumer; i++) {
new Thread(new Consumer(tasksPerConsumer)).start();
}
}
public static void main(String[] args) throws IOException {
Test3 t = new Test3();
t.runTest();
}
}
方法3的示例代码
public class Test4 {
private Semaphore mutex = new Semaphore(1);
private Semaphore fillCount = new Semaphore(0);
private Semaphore emptyCount = new Semaphore(3);
private Integer EOF = Integer.MAX_VALUE;
private final Queue<Integer> list = new LinkedList<>(); // need to put/get data in FIFO
class Producer implements Runnable {
private final int totalTasks;
Producer(int totalTasks) {
this.totalTasks = totalTasks;
}
@Override
public void run() {
try {
for (int i = 0; i < totalTasks + 1; i++) {
emptyCount.acquire();
mutex.acquire();
if (i == totalTasks) {
list.offer(EOF);
} else {
// add a valid value
list.offer(i);
System.out.println("Produced: " + i);
}
mutex.release();
fillCount.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try {
boolean finished = false;
while (!finished) {
fillCount.acquire();
mutex.acquire();
int item = list.poll();
if (EOF.equals(item)) {
// do not consume this item because it means EOF
finished = true;
} else {
// it's a valid value, consume it.
System.out.println("Consumed: " + item);
}
mutex.release();
emptyCount.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void runTest() {
int numProducer = 3;
int tasksPerProducer = 10;
for (int i = 0; i < numProducer; i++) {
new Thread(new Producer(tasksPerProducer)).start();
}
int numConsumer = numProducer; // producers will put N EOFs to kill N consumers.
for (int i = 0; i < numConsumer; i++) {
new Thread(new Consumer()).start();
}
}
public static void main(String[] args) throws IOException {
Test4 t = new Test4();
t.runTest();
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句