HDFSHadoopマップでインクリメンタルアップデートを処理する方法-Reduce

スダーシャン

このようなデータ(file.txt内)を持つHDFの構造化されたベーステキストファイルがあります。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|

file.txtのサイズは30GBです。

サイズが約2GBのインクリメンタルデータfile1.txtが、以下のようにHFDSで同じ形式で表示されます。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|

ここで、file.txtとfile1.txtを組み合わせて、すべての一意のレコードを含む最終的なテキストファイルを作成する必要があります。

両方のファイルのキーはOrgIdです。同じOrgIdが最初のファイルで見つかった場合は、新しいOrgIdに置き換える必要があり、見つからない場合は、新しいOrgIdを挿入する必要があります。

最終出力は次のようになります。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|

mapreduceでそれを行うにはどうすればよいですか?

このような個別のファイルが約10.000あるため、HIVEソリューションを使用しません。HIVEで10.000のパーティションを作成する必要があります。

このユースケースにSparkを使用するための提案はありますか?

ラメシュマハルジャン

私はプログラムにあなたを示唆していますscalaためsparkあなたがプログラムした場合mapreduce、それだけのために有用であろうhadoopが、でプログラミングするscalaためにsparkあなたが処理できるようになるsparkだけでなく、としてhadoopモデルSparkの欠点を処理するために開始されましたmapreduceこのトピックに関する多くのリソースを見つけることができます。それらの1つはこれです

あなたの問題に関して、私はあなたに使用することを提案しています dataframe

最初のタスクはschema、データフレーム用に作成することです。

val schema = StructType(Array(StructField("OgId", StringType),
  StructField("ItemId", StringType),
  StructField("segmentId", StringType),
  StructField("Sequence", StringType),
  StructField("Action", StringType)))

次のタスクは、2つのファイルを読み取り、上記のスキーマを使用してデータフレームを作成することです。

import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

の出力df1

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136   |4        |1       |I     |
|4295877346|136   |4        |1       |I     |
|4295877341|138   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877341|145   |14       |1       |I     |
+----------+------+---------+--------+------+

そしての出力df2

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

次に、要件に応じて、ifと一致する場合は削除rowsdf1すべてを追加しますこれらの要件は、以下のように実行できます。OgIddf2df2df1

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)

最終出力は

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136   |4        |1       |I     |
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

この最終結果は、hdfsとして保存できます。

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")

これがお役に立てば幸いです

注:入力位置と出力位置のパスを正しく記述していることを確認してください

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

バリアントサイズのreduce_meanがテンソルフローに集まった後にテンソルをスタックする方法は?

分類Dev

RavenDBマップreduce、reduceの重複エントリ

分類Dev

ダーツマップで.reduceを使用してタタルを計算する方法

分類Dev

フィルタとマップをチェーンする代わりにReduceを使用する

分類Dev

RavenDBで複数のグループを使用してmap / reduceインデックスを作成するにはどうすればよいですか

分類Dev

Javascriptを `reduce`性能

分類Dev

Sails.jsmongodbマップreduce

分類Dev

NgRxアプリケーションのオブジェクトでArray.reduce関数を使用する

分類Dev

アプリがバックグラウンドで実行されているときにFCMメッセージとタイトルを処理する方法

分類Dev

mongo map reduceクエリでマップ関数の出力を確認する方法はありますか?

分類Dev

reduceメソッドで値オブジェクトキーを追加する方法

分類Dev

mapまたはreduceを使用してこれをforループでリファクタリングするより良い方法はありますか?

分類Dev

Array.prototype.reduce()関数でeslint no-param-reassignルールを処理する方法

分類Dev

マップreduceのコードpythonでエラーが発生しました '文字列インデックスが範囲外です'

分類Dev

メインゲームループでキーアップイベントを処理する方法は?

分類Dev

Pythonマップ関数内でreduceを使用する

分類Dev

JavaScriptのreduceメソッドの現在のインデックスパラメータが1から始まるのはなぜですか?

分類Dev

WindowsアプリでSecondaryTileクリックイベントを処理する方法

分類Dev

学習マップ-MongoDBでReduce

分類Dev

reduceメソッドのアキュムレータに関連するタイプエラー

分類Dev

特定のキーでオブジェクト値をカウントするためのreduceメソッド

分類Dev

マップ内のリストを合計するためにreduceを使用する方法

分類Dev

データのグループ内のデータのグループをカウントするためにreduceを使用する

分類Dev

reduceを使用してjsonでグループ化する方法は?

分類Dev

バンプ_reduce_memory_useでRabbitMQがクラッシュする

分類Dev

デフォルトのファイルマネージャー(nautilus)を有効にしてデスクトップアイコンを処理する方法

分類Dev

ReactJsアプリで従来のforeachループの代わりにreduce()を使用する方法

分類Dev

Androidのグーグルマップの現在地アイコンのクリックイベントを処理する方法

分類Dev

Map / Reduceを使用して多数のセットからマップを作成する

Related 関連記事

  1. 1

    バリアントサイズのreduce_meanがテンソルフローに集まった後にテンソルをスタックする方法は?

  2. 2

    RavenDBマップreduce、reduceの重複エントリ

  3. 3

    ダーツマップで.reduceを使用してタタルを計算する方法

  4. 4

    フィルタとマップをチェーンする代わりにReduceを使用する

  5. 5

    RavenDBで複数のグループを使用してmap / reduceインデックスを作成するにはどうすればよいですか

  6. 6

    Javascriptを `reduce`性能

  7. 7

    Sails.jsmongodbマップreduce

  8. 8

    NgRxアプリケーションのオブジェクトでArray.reduce関数を使用する

  9. 9

    アプリがバックグラウンドで実行されているときにFCMメッセージとタイトルを処理する方法

  10. 10

    mongo map reduceクエリでマップ関数の出力を確認する方法はありますか?

  11. 11

    reduceメソッドで値オブジェクトキーを追加する方法

  12. 12

    mapまたはreduceを使用してこれをforループでリファクタリングするより良い方法はありますか?

  13. 13

    Array.prototype.reduce()関数でeslint no-param-reassignルールを処理する方法

  14. 14

    マップreduceのコードpythonでエラーが発生しました '文字列インデックスが範囲外です'

  15. 15

    メインゲームループでキーアップイベントを処理する方法は?

  16. 16

    Pythonマップ関数内でreduceを使用する

  17. 17

    JavaScriptのreduceメソッドの現在のインデックスパラメータが1から始まるのはなぜですか?

  18. 18

    WindowsアプリでSecondaryTileクリックイベントを処理する方法

  19. 19

    学習マップ-MongoDBでReduce

  20. 20

    reduceメソッドのアキュムレータに関連するタイプエラー

  21. 21

    特定のキーでオブジェクト値をカウントするためのreduceメソッド

  22. 22

    マップ内のリストを合計するためにreduceを使用する方法

  23. 23

    データのグループ内のデータのグループをカウントするためにreduceを使用する

  24. 24

    reduceを使用してjsonでグループ化する方法は?

  25. 25

    バンプ_reduce_memory_useでRabbitMQがクラッシュする

  26. 26

    デフォルトのファイルマネージャー(nautilus)を有効にしてデスクトップアイコンを処理する方法

  27. 27

    ReactJsアプリで従来のforeachループの代わりにreduce()を使用する方法

  28. 28

    Androidのグーグルマップの現在地アイコンのクリックイベントを処理する方法

  29. 29

    Map / Reduceを使用して多数のセットからマップを作成する

ホットタグ

アーカイブ