基本的なWebスクラッパーを実行しています。
Completable futureを使用して、複数のスレッドと並行してスクラップを実行したい。各ジョブは、スクラップする必要があるPageオブジェクトを取得し、作成されたURLのリストを含むPageオブジェクトを返します。
リストからの各URLは、まだスクラップに送信されていない場合、新しいジョブを開始します。すべての並列ジョブが完了したら、ロジックを続行します。
「allFutures.thenRun(()-> executorService.shutdown());」を削除すると、最初のページオブジェクトのみをスクレイピングして終了するこのコードの問題 次に、すべてのページ/ URLを収集しますが、プログラムは終了しません。
public class Demo
{
private final Set<Page> pages = new HashSet<>();
private final Set<Page> submittedPages = new HashSet<>();
private final ExecutorService executorService;
public Demo(final int numberOfThreads)
{
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) throws ExecutionException, InterruptedException
{
this.submitTask(new Page(url));
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
allFutures.thenRun(() -> executorService.shutdown());
// do something with pages
}
private void submitTask(final Page page)
{
if (!this.submittedPages.contains(page))
{
this.submittedPages.add(page);
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> new Task(page).call(), this.executorService) //want to run this parallel in multiple threads
.thenAccept(receivedPage -> {
this.savePage(receivedPage);
this.submitCollectedLinks(receivedPage);
});
completableFutureList.add(cf);
}
}
private void submitCollectedLinks(final Page page){
page.getLinks()
.stream()
.map(Page::new)
.forEach(this::submitTask);
}
private void savePage(final Page page)
{
this.pages.add(page);
}
}
コードにはいくつかの問題があります。completableFutureList
後で追加される先物がある可能性がある場合のスナップショットの完了時にexecutorサービスのシャットダウンをスケジュールしていますが、さらに悪いことに// do something with pages
、スナップショットさえまだ完了していない時点に達しています。
の宣言は示していませんがcompletableFutureList
、異なるスレッドから変更したpages
and がスレッドセーフではないでsubmittedPages
初期化されていることを考えると、HashSet
リストについても気分がよくありません。しかし、とにかくリストは必要ありません。送信コードを変更して、後続のタスクで構成される保留中のタスクを表すフューチャーを返す必要があります。渡された関数はthenCompose
、前提条件の段階が完了したときに評価されます。つまり、これにより、関数のチェーン時に不明な先物への依存が可能になります。
HashSet
sをスレッドセーフ構造に置き換えるだけでは不十分であることに注意してください。他のスレッドがこれら2つの呼び出しの間に実行しないことが保証されていないため(「check-then-act」アンチパターンと呼ばcontains
れる)add
、beforeのようなシーケンスを回避する必要がありadd
ます。justを使用できますadd
。これは、すでに何もせずfalse
、要素がすでに存在する場合に戻ります。適切なスレッドセーフSet
実装を使用して、必要な原子性を提供します。
これらをまとめると、例えば、
public class Demo {
private final Set<Page> pages = ConcurrentHashMap.newKeySet();
private final Set<Page> submittedPages = ConcurrentHashMap.newKeySet();
private final ExecutorService executorService;
public Demo(final int numberOfThreads) {
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) {
this.submitTask(new Page(url))
// shutdown even in the exceptional case
.whenComplete((_void, throwable) -> executorService.shutdown())
.join(); // wait for completion before doing something with pages
// do something with pages
}
private CompletableFuture<Void> submitTask(final Page page) {
// use a single add to avoid check-then-act anti-pattern
if(this.submittedPages.add(page)) {
return CompletableFuture.supplyAsync(new Task(page)::call, executorService)
// compose with recursively encountered tasks
.thenCompose(receivedPage -> {
this.savePage(receivedPage);
return this.submitCollectedLinks(receivedPage);
});
}
// do nothing when already submitted
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> submitCollectedLinks(final Page page) {
return CompletableFuture.allOf(page.getLinks()
.stream().map(Page::new).map(this::submitTask)
.toArray(CompletableFuture<?>[]::new));
}
private void savePage(final Page page) {
this.pages.add(page);
}
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加