我想逐行读取文件,对每行进行一些缓慢的操作(可以轻松并行完成),然后将结果逐行写入文件。我不在乎输出的顺序。输入和输出非常大,无法容纳在内存中。我希望能够对同时运行的线程数以及内存中的行数设置硬性限制。
我用于文件IO(Apache Commons CSV)的库似乎未提供同步文件访问,因此我认为我无法一次从多个线程读取同一文件或从同一线程写入同一文件。如果可能的话,我将创建一个ThreadPoolExecutor并为每行提供一个任务,该任务将简单地读取该行,执行计算并写入结果。
相反,我认为我需要的是执行解析的单个线程,用于解析的输入行的有界队列,带有执行计算的作业的线程池,用于计算的输出行的有界队列以及执行写作。一个生产者,许多消费者生产者和一个消费者(如果有的话)。
我所看到的是这样的:
BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);
Thread parserThread = new Thread(() -> {
while (inputFileIterator.hasNext()) {
CSVRecord record = inputFileIterator.next();
parsedQueue.put(record); // blocks if queue is full
}
});
// the job queue of the thread pool has to be bounded too, otherwise all
// the objects in the input queue will be given to jobs immediately and
// I'll run out of heap space
// source: https://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
Thread processingBossThread = new Thread(() -> {
while (!inputQueue.isEmpty() || parserThread.isAlive()) {
CSVRecord record = inputQueue.take(); // blocks if queue is empty
executorService.execute(() -> {
String[] array = this.doStuff(record);
outputQueue.put(array); // blocks if queue is full
});
}
// getting here that means that all CSV rows have been read and
// added to the processing queue
executorService.shutdown(); // do not accept any new tasks
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// wait for existing tasks to finish
});
Thread writerThread = new Thread(() -> {
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
String[] outputRow = outputQueue.take(); // blocks if queue is empty
outputFileWriter.printRecord((Object[]) outputRow);
});
parserThread.start();
consumerBossThread.start();
writerThread.start();
// wait until writer thread has finished
writerThread.join();
我省略了日志记录和异常处理,因此看起来比实际要短得多。
此解决方案有效,但我对此不满意。不得不创建我自己的线程,检查它们的isAlive(),在Runnable中创建Runnable,当我真的只想等到所有工作人员都完成后被迫指定超时等等似乎很棘手。一个100多行的方法,或者什至几百行代码(如果我将Runnables设为自己的类的话),这似乎是一个非常基本的模式。
有更好的解决方案吗?我想尽可能地利用Java的库,以帮助保持我的代码可维护性并与最佳实践保持一致。我仍然想知道它在后台执行的操作,但是我怀疑自己执行所有这些操作是最好的方法。
更新:更好的解决方案,根据答案的建议:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
while (it.hasNext()) {
CSVRecord record = it.next();
executorService.execute(() -> {
String[] array = this.doStuff(record);
synchronized (writer) {
writer.printRecord((Object[]) array);
}
});
}
首先,我想指出三种可能的情况:
1.-对于文件的所有行,使用doStuff方法处理一行所需的时间大于从磁盘读取同一行并进行解析所需的时间
2.-对于文件的所有行,使用doStuff方法处理一行所需的时间小于或等于读取同一行并对其进行解析所花费的时间。
3.-同一文件的第一种情况和第二种情况都没有。
您的解决方案应该适合第一种情况,但不适用于第二种或第三种情况,而且,您也不能以同步方式修改队列。甚至,如果您遇到的是数字2之类的场景,那么当没有数据要发送到输出时,或者没有行要发送到队列中以待doStuff处理的行时,您就浪费了CPU周期。 ,通过旋转:
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
最后,无论您遇到哪种情况,我都建议您使用Monitor对象,这将使您可以放置特定线程,直到另一个进程通知它们某个条件为真并可以再次激活它们为止。通过使用Monitor对象,您不会浪费CPU周期。
有关更多信息,请参见:https : //docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html
编辑:我已经删除了使用同步方法的建议,因为正如您所指出的那样,BlockingQueue的方法是线程安全的(或几乎所有的),并防止出现竞争情况。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句