对于Java中的生产者-消费者来说,更好的习惯用法是什么?

彼得·P

我想逐行读取文件,对每行进行一些缓慢的操作(可以轻松并行完成),然后将结果逐行写入文件。我不在乎输出的顺序。输入和输出非常大,无法容纳在内存中。我希望能够对同时运行的线程数以及内存中的行数设置硬性限制。

我用于文件IO(Apache Commons CSV)的库似乎未提供同步文件访问,因此我认为我无法一次从多个线程读取同一文件或从同一线程写入同一文件。如果可能的话,我将创建一个ThreadPoolExecutor并为每行提供一个任务,该任务将简单地读取该行,执行计算并写入结果。

相反,我认为我需要的是执行解析的单个线程,用于解析的输入行的有界队列,带有执行计算的作业的线程池,用于计算的输出行的有界队列以及执行写作。一个生产者,许多消费者生产者和一个消费者(如果有的话)。

我所看到的是这样的:

BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);

Thread parserThread = new Thread(() -> {
    while (inputFileIterator.hasNext()) {
        CSVRecord record = inputFileIterator.next();
        parsedQueue.put(record); // blocks if queue is full
    }
});

// the job queue of the thread pool has to be bounded too, otherwise all 
// the objects in the input queue will be given to jobs immediately and 
// I'll run out of heap space
// source: https://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler 
    = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService 
    = new ThreadPoolExecutor(
        NUMBER_OF_THREADS, 
        NUMBER_OF_THREADS, 
        0L, 
        TimeUnit.MILLISECONDS, 
        jobQueue, 
        rejectedExecutionHandler
    );
Thread processingBossThread = new Thread(() -> {
    while (!inputQueue.isEmpty() || parserThread.isAlive()) {
        CSVRecord record = inputQueue.take(); // blocks if queue is empty
        executorService.execute(() -> {
            String[] array = this.doStuff(record);
            outputQueue.put(array); // blocks if queue is full
        });
    }
    // getting here that means that all CSV rows have been read and 
    // added to the processing queue
    executorService.shutdown(); // do not accept any new tasks
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
        // wait for existing tasks to finish
});

Thread writerThread = new Thread(() -> {
    while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
        String[] outputRow = outputQueue.take(); // blocks if queue is empty
        outputFileWriter.printRecord((Object[]) outputRow);
});

parserThread.start();
consumerBossThread.start();
writerThread.start();

// wait until writer thread has finished
writerThread.join();

我省略了日志记录和异常处理,因此看起来比实际要短得多。

此解决方案有效,但我对此不满意。不得不创建我自己的线程,检查它们的isAlive(),在Runnable中创建Runnable,当我真的只想等到所有工作人员都完成后被迫指定超时等等似乎很棘手。一个100多行的方法,或者什至几百行代码(如果我将Runnables设为自己的类的话),这似乎是一个非常基本的模式。

有更好的解决方案吗?我想尽可能地利用Java的库,以帮助保持我的代码可维护性并与最佳实践保持一致。我仍然想知道它在后台执行的操作,但是我怀疑自己执行所有这些操作是最好的方法。

更新:更好的解决方案,根据答案的建议:

BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
    = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService 
    = new ThreadPoolExecutor(
        NUMBER_OF_THREADS, 
        NUMBER_OF_THREADS, 
        0L, 
        TimeUnit.MILLISECONDS, 
        jobQueue, 
        rejectedExecutionHandler
    );

while (it.hasNext()) {
    CSVRecord record = it.next();
    executorService.execute(() -> {
        String[] array = this.doStuff(record);
        synchronized (writer) {
            writer.printRecord((Object[]) array);
        }
    });
}
海梅·奥尔特加(Jaime Ortega)

首先,我想指出三种可能的情况:

1.-对于文件的所有行,使用doStuff方法处理一行所需的时间大于从磁盘读取同一行并进行解析所需的时间

2.-对于文件的所有行,使用doStuff方法处理一行所需的时间小于或等于读取同一行并对其进行解析所花费的时间。

3.-同一文件的第一种情况和第二种情况都没有。

您的解决方案应该适合第一种情况,但不适用于第二种或第三种情况,而且,您也不能以同步方式修改队列。甚至,如果您遇到的是数字2之类的场景,那么当没有数据要发送到输出时,或者没有行要发送到队列中以待doStuff处理的行时,您就浪费了CPU周期。 ,通过旋转:

while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {

最后,无论您遇到哪种情况,我都建议您使用Monitor对象,这将使您可以放置​​特定线程,直到另一个进程通知它们某个条件为真并可以再次激活它们为止。通过使用Monitor对象,您不会浪费CPU周期。

有关更多信息,请参见:https : //docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html

编辑:我已经删除了使用同步方法的建议,因为正如您所指出的那样,BlockingQueue的方法是线程安全的(或几乎所有的),并防止出现竞争情况。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

对于Java中的生产者-消费者来说,更好的习惯用法是什么?

来自分类Dev

对于生产者-消费者模式,哪种实现更好:ArrayBlockingQueue或带有Exchangers的Arraylists

来自分类Dev

生产者/消费者队列模式中更快的生产者

来自分类Dev

生产者/消费者线程并发性Java

来自分类Dev

Go中的同时生产者和消费者

来自分类Dev

Java中具有2个进程的生产者-消费者问题

来自分类Dev

为什么我在 Java 中使用锁和条件在这个生产者 - 消费者中陷入僵局?

来自分类Dev

异步生产者/消费者

来自分类Dev

定制生产者消费者

来自分类Dev

C:生产者/消费者

来自分类Dev

生产者,消费者POSIX

来自分类Dev

C:生产者/消费者

来自分类Dev

Python消费者生产者。生产者等待消费者消费

来自分类Dev

在生产者/消费者场景中,如何从消费者那里得到回应?

来自分类Dev

对于单个生产者和消费者,JMS队列是否遵循FIFO?

来自分类Dev

为什么我的消费者在队列中与生产者分开工作?

来自分类Dev

管道中的消费者在什么时候可以减慢生产者的速度?

来自分类Dev

用SQLite3实现生产者/消费者的正确方法是什么?

来自分类Dev

一个快速生产者多个慢速消费者的最佳方案是什么?

来自分类Dev

在生产者-消费者方案中,LinkedBlockingQueue与许多生产者的线程安全性

来自分类Dev

在生产者-消费者方案中,LinkedBlockingQueue与许多生产者的线程安全性

来自分类Dev

与Java静态块等效的C ++习惯用法是什么?

来自分类Dev

在生产者/消费者模式中,如何杀死使用者线程?

来自分类Dev

生产者/消费者-生产者将数据添加到集合中而不会阻塞,消费者批量使用集合中的数据

来自分类Dev

需要帮助找出Java中基本的多线程代码出了什么问题(多个生产者-一个消费者)

来自分类Dev

需要帮助找出Java中基本的多线程代码出了什么问题(多个生产者-一个消费者)

来自分类Dev

Java Kafka生产者错误

来自分类Dev

生产者中的Kafka数据丢失

来自分类Dev

Java的“分层队列”实现,适用于快速的生产者,缓慢的消费者

Related 相关文章

  1. 1

    对于Java中的生产者-消费者来说,更好的习惯用法是什么?

  2. 2

    对于生产者-消费者模式,哪种实现更好:ArrayBlockingQueue或带有Exchangers的Arraylists

  3. 3

    生产者/消费者队列模式中更快的生产者

  4. 4

    生产者/消费者线程并发性Java

  5. 5

    Go中的同时生产者和消费者

  6. 6

    Java中具有2个进程的生产者-消费者问题

  7. 7

    为什么我在 Java 中使用锁和条件在这个生产者 - 消费者中陷入僵局?

  8. 8

    异步生产者/消费者

  9. 9

    定制生产者消费者

  10. 10

    C:生产者/消费者

  11. 11

    生产者,消费者POSIX

  12. 12

    C:生产者/消费者

  13. 13

    Python消费者生产者。生产者等待消费者消费

  14. 14

    在生产者/消费者场景中,如何从消费者那里得到回应?

  15. 15

    对于单个生产者和消费者,JMS队列是否遵循FIFO?

  16. 16

    为什么我的消费者在队列中与生产者分开工作?

  17. 17

    管道中的消费者在什么时候可以减慢生产者的速度?

  18. 18

    用SQLite3实现生产者/消费者的正确方法是什么?

  19. 19

    一个快速生产者多个慢速消费者的最佳方案是什么?

  20. 20

    在生产者-消费者方案中,LinkedBlockingQueue与许多生产者的线程安全性

  21. 21

    在生产者-消费者方案中,LinkedBlockingQueue与许多生产者的线程安全性

  22. 22

    与Java静态块等效的C ++习惯用法是什么?

  23. 23

    在生产者/消费者模式中,如何杀死使用者线程?

  24. 24

    生产者/消费者-生产者将数据添加到集合中而不会阻塞,消费者批量使用集合中的数据

  25. 25

    需要帮助找出Java中基本的多线程代码出了什么问题(多个生产者-一个消费者)

  26. 26

    需要帮助找出Java中基本的多线程代码出了什么问题(多个生产者-一个消费者)

  27. 27

    Java Kafka生产者错误

  28. 28

    生产者中的Kafka数据丢失

  29. 29

    Java的“分层队列”实现,适用于快速的生产者,缓慢的消费者

热门标签

归档