JavaでのSparkファイルストリーミングによるチェックポイント

Ajit:

私のスパークストリーミングアプリケーションが停止/終了した場合に、hadoopからのすべての未処理ファイルを処理するために、スパークファイルストリーミングアプリケーションにチェックポイントを実装したいと思います。私はこれに従っています:ストリーミングプログラミングガイドですが、JavaStreamingContextFactoryが見つかりません。どうしたらいいか手伝ってください。

私のコードは

public class StartAppWithCheckPoint {

    public static void main(String[] args) {
        
        try {
            
            String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/"; 
            String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
                  @Override public JavaStreamingContext create() {
                      
                    SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
                    JavaSparkContext sc = new JavaSparkContext(sparkConf);  
                    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
                    JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
                    
                    jssc.checkpoint(checkpointDirectory);
                    return jssc;
                  }
                };
                
            JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
            
            context.start();
            context.awaitTermination();
            context.close();
            sparkSession.close();
            
        } catch(Exception e) {
            e.printStackTrace();
        }   
    }
}
マジッドハジババ:

チェックポイントを使用する必要があります

チェックポイントにはまたはのいずれかのステートフル変換を使用しますgit-hubのprebuild sparkとspark sourceとともに提供されるspark-examplesに多くの例があります具体的には、JavaStatefulNetworkWordCount.javaを参照してくださいupdateStateByKeyreduceByKeyAndWindow

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Sparkチェックポイント非ストリーミング-チェックポイントファイルは、後続のジョブ実行またはドライバープログラムで使用できます

分類Dev

DStreamsのSparkストリーミングチェックポイント

分類Dev

Sparkストリーミングの多くのソースをチェックポイントする方法

分類Dev

Sparkストリーミングチェックポイントの回復は非常に遅い

分類Dev

Spark構造化ストリーミングチェックポイントのクリーンアップ

分類Dev

チェックポイントを使用したSparkストリーミング

分類Dev

Apache Flinkストリーミングでチェックポイントのタイミングをどのように計りますか?

分類Dev

シークできないファイルのようなオブジェクトを複数のシンクにストリーミングする

分類Dev

チェックポイントを有効にしたSparkストリーミングSQS

分類Dev

Javaで空スペースのあるJPGファイルをクロップ/トリミング

分類Dev

Sparkストリーミングアプリケーションを再デプロイするようにチェックポイントを構成するにはどうすればよいですか?

分類Dev

ストリーミング用にvlcをすべてのインターフェイスにバインドし、telnetインターフェイスのループバックのみにするにはどうすればよいですか?

分類Dev

ストリーミング用にvlcをすべてのインターフェイスにバインドし、telnetインターフェイスのループバックのみにするにはどうすればよいですか?

分類Dev

Java8複数のファイルを行にフラットマップでストリーミング

分類Dev

Spark構造化ストリーミング-入力ソースの数が増えたため、チェックポイントでAssertionError

分類Dev

gitチェックアウトによって失われたファイルの回復(ステージング/コミットされていない)

分類Dev

apachepoiを使用したストリーミング/ページネーション戦略によるxlsxファイルのチャンクとしての解析

分類Dev

ヘッダー付きのcsvファイルをJavaのHashMap <String、Double>にストリーミングするにはどうすればよいですか?

分類Dev

Ubuntu16.04にファミリーサーチインデックスをインストールする方法

分類Dev

チェックアウトの間に特定のファイルを自動的にクリーニングする

分類Dev

フォントの個々のグリフをユニコード名でバッチでsvgファイルにエクスポートしますか?

分類Dev

Elixirのファイルにストリーミングするにはどうすればよいですか?

分類Dev

GSONのJsonReaderでJsonファイルをストリーミングするとき、オブジェクトを文字列にダンプできますか?

分類Dev

browserSyncが実際に変更されたファイルのみをストリーミングできるようにするために、CSSの変更を監視するgulpウォッチをフィルタリングする方法

分類Dev

ファイルのインストールアクションのSSLチェックを無効にする

分類Dev

Spark構造化ストリーミングファイルシンクファイルパスまたはファイル名を定義するにはどうすればよいですか?

分類Dev

クロスドメインポリシーを回避してjqueryajaxを使用して別のドメインからオーディオファイルをストリーミングするにはどうすればよいですか?

分類Dev

PFファイアウォール:特定のポート転送ルールを除いて、ループバックインターフェイスですべてのフィルタリングを無効にする方法は?

分類Dev

ブロックチェーン:Windows10でのHyperLedgerファブリックのインストール

Related 関連記事

  1. 1

    Sparkチェックポイント非ストリーミング-チェックポイントファイルは、後続のジョブ実行またはドライバープログラムで使用できます

  2. 2

    DStreamsのSparkストリーミングチェックポイント

  3. 3

    Sparkストリーミングの多くのソースをチェックポイントする方法

  4. 4

    Sparkストリーミングチェックポイントの回復は非常に遅い

  5. 5

    Spark構造化ストリーミングチェックポイントのクリーンアップ

  6. 6

    チェックポイントを使用したSparkストリーミング

  7. 7

    Apache Flinkストリーミングでチェックポイントのタイミングをどのように計りますか?

  8. 8

    シークできないファイルのようなオブジェクトを複数のシンクにストリーミングする

  9. 9

    チェックポイントを有効にしたSparkストリーミングSQS

  10. 10

    Javaで空スペースのあるJPGファイルをクロップ/トリミング

  11. 11

    Sparkストリーミングアプリケーションを再デプロイするようにチェックポイントを構成するにはどうすればよいですか?

  12. 12

    ストリーミング用にvlcをすべてのインターフェイスにバインドし、telnetインターフェイスのループバックのみにするにはどうすればよいですか?

  13. 13

    ストリーミング用にvlcをすべてのインターフェイスにバインドし、telnetインターフェイスのループバックのみにするにはどうすればよいですか?

  14. 14

    Java8複数のファイルを行にフラットマップでストリーミング

  15. 15

    Spark構造化ストリーミング-入力ソースの数が増えたため、チェックポイントでAssertionError

  16. 16

    gitチェックアウトによって失われたファイルの回復(ステージング/コミットされていない)

  17. 17

    apachepoiを使用したストリーミング/ページネーション戦略によるxlsxファイルのチャンクとしての解析

  18. 18

    ヘッダー付きのcsvファイルをJavaのHashMap <String、Double>にストリーミングするにはどうすればよいですか?

  19. 19

    Ubuntu16.04にファミリーサーチインデックスをインストールする方法

  20. 20

    チェックアウトの間に特定のファイルを自動的にクリーニングする

  21. 21

    フォントの個々のグリフをユニコード名でバッチでsvgファイルにエクスポートしますか?

  22. 22

    Elixirのファイルにストリーミングするにはどうすればよいですか?

  23. 23

    GSONのJsonReaderでJsonファイルをストリーミングするとき、オブジェクトを文字列にダンプできますか?

  24. 24

    browserSyncが実際に変更されたファイルのみをストリーミングできるようにするために、CSSの変更を監視するgulpウォッチをフィルタリングする方法

  25. 25

    ファイルのインストールアクションのSSLチェックを無効にする

  26. 26

    Spark構造化ストリーミングファイルシンクファイルパスまたはファイル名を定義するにはどうすればよいですか?

  27. 27

    クロスドメインポリシーを回避してjqueryajaxを使用して別のドメインからオーディオファイルをストリーミングするにはどうすればよいですか?

  28. 28

    PFファイアウォール:特定のポート転送ルールを除いて、ループバックインターフェイスですべてのフィルタリングを無効にする方法は?

  29. 29

    ブロックチェーン:Windows10でのHyperLedgerファブリックのインストール

ホットタグ

アーカイブ