データフロージョブで処理されたファイルを特定するにはどうすればよいですか?ワイルドカードを使用してクラウドストレージからファイルを読み取ります。ただし、ジョブが実行されるたびに、すべてのファイルが再読み取りされます。
これはバッチジョブであり、以下は私が使用しているTextIOを読み取るサンプルです。
PCollection<String> filePColection = pipeline.apply("Read files from Cloud Storage ", TextIO.read().from("gs://bucketName/TrafficData*.txt"));
ワイルドカードに一致するファイルのリストを表示するにgsutils
は、CloudStorageコマンドラインユーティリティであるを使用できます。次のようにします。
gsutils ls gs://bucketName/TrafficData*.txt
これで、バッチジョブを複数回実行する場合、パイプラインには、分析済みのファイルがすでにあるかどうかを知る方法がありません。新しいファイルの分析を回避するには、次のいずれかを実行できます。
ストリーミングジョブを定義し、TextIO
のwatchForNewFiles
機能を使用します。ファイルの処理を続けたい限り、ジョブを実行したままにしておく必要があります。
すでに分析されたファイルをパイプラインに提供する方法を見つけてください。このため、パイプラインを実行するたびに、分析するファイルのリストを生成し、それをに入れPCollection
、それぞれをTextIO.readAll()
で読み取り、分析されたファイルのリストをどこかに保存することができます。後でパイプラインを再度実行すると、このリストを、再度実行する必要のないファイルのブラックリストとして使用できます。
これらの2つのオプションのいずれかに関する解決策を検討したい場合は、コメントでお知らせください。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加