如何使用Semphores在Producer-Consumer中消费?

维文

我正在尝试使用信号量来解决生产者-消费者问题。该程序对我来说很好,除了一个地方。

public class ProducerConsumerWithSemaphores
{
    private final ArrayList<Integer> list = new ArrayList<>(5);
    private final Semaphore semaphoreProducer = new Semaphore(1);
    private final Semaphore semaphoreConsumer = new Semaphore(0);

    private void produce() throws InterruptedException
    {
        for(int i = 0;i< 5;i++)
        {
            semaphoreProducer.acquire();
            list.add(i);
            System.out.println("Produced: " + i);
            semaphoreConsumer.release();
        }
    }

    private void consumer() throws InterruptedException
    {
        while (!list.isEmpty())    /// This line is where I have the doubt
        {
            semaphoreConsumer.acquire();
            System.out.println("Consumer: " + list.remove(list.size()-1));
            semaphoreProducer.release();
            Thread.sleep(100);
        }
    }

    public static void main(String[] args)
    {
        final ProducerConsumerWithSemaphores obj = new ProducerConsumerWithSemaphores();

        new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    obj.produce();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    obj.consumer();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

可以在获取信号量之前检查列表是否不为空吗?这会在多线程环境中引起任何问题吗?

沃尔特苏
private void consumer() throws InterruptedException
{
    while (!list.isEmpty())    /// This line is where I have the doubt

问题是,如果消费者运行的速度比生产者快,那么您的消费者立即退出,那么您就没有消费者了!!

正确的示例看起来像是生产者-消费者问题#使用信号量我相信您的意图不是true用作无限循环,因为您希望生产者/消费者在完成工作后退出。如果您打算这样做,则可以设置1.totalCount结束循环。 2.或在生产者放置最后一个 boolean 标记后由生产者设置标志 putItemIntoBuffer 该标志以及都必须受到保护 buffer (更新:如果有多个生产者/消费者,此方法将不起作用)3.模拟EOF(从生产者那里消费的想法-消费;消费者如何停止?

这会在多线程环境中引起任何问题吗?

您的关键部分(您的list)不受保护。通常我们使用3个信号量。第三个用作互斥体以保护缓冲区。

要停止生产者/消费者,请
使用方法1的示例代码:

public class Test3 {

  private Semaphore mutex = new Semaphore(1);
  private Semaphore fillCount = new Semaphore(0);
  private Semaphore emptyCount = new Semaphore(3);

  private final List<Integer> list = new ArrayList<>();

  class Producer implements Runnable {

    private final int totalTasks;

    Producer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks; i++) {
          emptyCount.acquire();
          mutex.acquire();
          list.add(i);
          System.out.println("Produced: " + i);
          mutex.release();
          fillCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Consumer implements Runnable {
    private final int totalTasks;

    Consumer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks; i++) {
          fillCount.acquire();
          mutex.acquire();
          int item = list.remove(list.size() - 1);
          System.out.println("Consumed: " + item);
          mutex.release();
          emptyCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public void runTest() {
    int numProducer = 3;
    int tasksPerProducer = 10;
    int numConsumer = 6;
    int tasksPerConsumer = 5;

    for (int i = 0; i < numProducer; i++) {
      new Thread(new Producer(tasksPerProducer)).start();
    }
    for (int i = 0; i < numConsumer; i++) {
      new Thread(new Consumer(tasksPerConsumer)).start();
    }
  }

  public static void main(String[] args) throws IOException {
    Test3 t = new Test3();
    t.runTest();
  }
}

方法3的示例代码

public class Test4 {

  private Semaphore mutex = new Semaphore(1);
  private Semaphore fillCount = new Semaphore(0);
  private Semaphore emptyCount = new Semaphore(3);

  private Integer EOF = Integer.MAX_VALUE;

  private final Queue<Integer> list = new LinkedList<>(); // need to put/get data in FIFO

  class Producer implements Runnable {

    private final int totalTasks;

    Producer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks + 1; i++) {
          emptyCount.acquire();
          mutex.acquire();
          if (i == totalTasks) {
            list.offer(EOF);
          } else {
            // add a valid value
            list.offer(i);
            System.out.println("Produced: " + i);
          }
          mutex.release();
          fillCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Consumer implements Runnable {

    @Override
    public void run() {
      try {
        boolean finished = false;
        while (!finished) {
          fillCount.acquire();
          mutex.acquire();
          int item = list.poll();
          if (EOF.equals(item)) {
            // do not consume this item because it means EOF
            finished = true;
          } else {
            // it's a valid value, consume it.
            System.out.println("Consumed: " + item);
          }
          mutex.release();
          emptyCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public void runTest() {
    int numProducer = 3;
    int tasksPerProducer = 10;

    for (int i = 0; i < numProducer; i++) {
      new Thread(new Producer(tasksPerProducer)).start();
    }

    int numConsumer = numProducer; // producers will put N EOFs to kill N consumers.
    for (int i = 0; i < numConsumer; i++) {
      new Thread(new Consumer()).start();
    }
  }

  public static void main(String[] args) throws IOException {
    Test4 t = new Test4();
    t.runTest();
  }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用Java中的wait()和notify()的Producer Consumer Program

来自分类Dev

使用Java中的wait()和notify()的Producer Consumer Program

来自分类Dev

在Kafka中,如何让消费者从本地分区消费?

来自分类Dev

python中的Producer-Consumer算法

来自分类Dev

kafka-console-consumer工具是否弃用了--group选项?如果是这样,我如何使用kafka-console-consumer设置消费者组。

来自分类Dev

在生产者/消费者场景中,如何从消费者那里得到回应?

来自分类Dev

kafka如何确定单个消费者组中的哪个消费者阅读消息?

来自分类Dev

Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

来自分类Dev

如何在骆驼测试中嘲笑AMQP消费者?

来自分类Dev

芹菜工人:如何从所有队列中消费?

来自分类Dev

如何在Kafka中创建新的消费群体

来自分类Dev

如何从Kafka的两个不同集群中消费?

来自分类Dev

如何从不同应用程序中的芹菜队列消费

来自分类Dev

如何在Nifi中查看Kafka的消费消息?

来自分类Dev

ActiveMQ中如何按特定顺序消费消息?

来自分类Dev

在Apache Kafka中延迟使用消费者的消息

来自分类Dev

使用aioamqp一次从多个队列中消费

来自分类Dev

是否有可能使用皮卡从每个队列中消费?

来自分类Dev

如何在Scala中实施Kafka Consumer

来自分类Dev

如何处理 Kafka Consumer 中的错误

来自分类Dev

如何使用Kafka Consumer API中的密钥读取数据?

来自分类Dev

使用信号量和pthread的Producer Consumer程序

来自分类Dev

在 Kafka 中,消费者在哪个 __consumer_offsets 分区上提交偏移量?

来自分类Dev

如何获得消费者滞后于卡夫卡Java中的一个消费群

来自分类Dev

在生产者/消费者模式中,如何杀死使用者线程?

来自分类Dev

如何修复 ActiveMQ 持久订阅抛出“持久消费者已在使用中”错误

来自分类Dev

生产者/消费者-生产者将数据添加到集合中而不会阻塞,消费者批量使用集合中的数据

来自分类Dev

Producer-Consumer程序中的线程优先级-wait()和notify()

来自分类Dev

Kafka Producer消费者API问题

Related 相关文章

  1. 1

    使用Java中的wait()和notify()的Producer Consumer Program

  2. 2

    使用Java中的wait()和notify()的Producer Consumer Program

  3. 3

    在Kafka中,如何让消费者从本地分区消费?

  4. 4

    python中的Producer-Consumer算法

  5. 5

    kafka-console-consumer工具是否弃用了--group选项?如果是这样,我如何使用kafka-console-consumer设置消费者组。

  6. 6

    在生产者/消费者场景中,如何从消费者那里得到回应?

  7. 7

    kafka如何确定单个消费者组中的哪个消费者阅读消息?

  8. 8

    Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

  9. 9

    如何在骆驼测试中嘲笑AMQP消费者?

  10. 10

    芹菜工人:如何从所有队列中消费?

  11. 11

    如何在Kafka中创建新的消费群体

  12. 12

    如何从Kafka的两个不同集群中消费?

  13. 13

    如何从不同应用程序中的芹菜队列消费

  14. 14

    如何在Nifi中查看Kafka的消费消息?

  15. 15

    ActiveMQ中如何按特定顺序消费消息?

  16. 16

    在Apache Kafka中延迟使用消费者的消息

  17. 17

    使用aioamqp一次从多个队列中消费

  18. 18

    是否有可能使用皮卡从每个队列中消费?

  19. 19

    如何在Scala中实施Kafka Consumer

  20. 20

    如何处理 Kafka Consumer 中的错误

  21. 21

    如何使用Kafka Consumer API中的密钥读取数据?

  22. 22

    使用信号量和pthread的Producer Consumer程序

  23. 23

    在 Kafka 中,消费者在哪个 __consumer_offsets 分区上提交偏移量?

  24. 24

    如何获得消费者滞后于卡夫卡Java中的一个消费群

  25. 25

    在生产者/消费者模式中,如何杀死使用者线程?

  26. 26

    如何修复 ActiveMQ 持久订阅抛出“持久消费者已在使用中”错误

  27. 27

    生产者/消费者-生产者将数据添加到集合中而不会阻塞,消费者批量使用集合中的数据

  28. 28

    Producer-Consumer程序中的线程优先级-wait()和notify()

  29. 29

    Kafka Producer消费者API问题

热门标签

归档