このようなデータ(file.txt内)を持つHDFの構造化されたベーステキストファイルがあります。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|
file.txtのサイズは30GBです。
サイズが約2GBのインクリメンタルデータfile1.txtが、以下のようにHFDSで同じ形式で表示されます。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|
ここで、file.txtとfile1.txtを組み合わせて、すべての一意のレコードを含む最終的なテキストファイルを作成する必要があります。
両方のファイルのキーはOrgIdです。同じOrgIdが最初のファイルで見つかった場合は、新しいOrgIdに置き換える必要があり、見つからない場合は、新しいOrgIdを挿入する必要があります。
最終出力は次のようになります。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
mapreduceでそれを行うにはどうすればよいですか?
このような個別のファイルが約10.000あるため、HIVEソリューションを使用しません。HIVEで10.000のパーティションを作成する必要があります。
このユースケースにSparkを使用するための提案はありますか?
私はプログラムにあなたを示唆していますscala
ためspark
。あなたがプログラムした場合mapreduce
、それだけのために有用であろうhadoop
が、でプログラミングするscala
ためにspark
あなたが処理できるようになるspark
だけでなく、としてhadoop
。モデルSpark
の欠点を処理するために開始されましたmapreduce
。このトピックに関する多くのリソースを見つけることができます。それらの1つはこれです
あなたの問題に関して、私はあなたに使用することを提案しています dataframe
最初のタスクはschema
、データフレーム用に作成することです。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
次のタスクは、2つのファイルを読み取り、上記のスキーマを使用してデータフレームを作成することです。
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
の出力df1
は
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
そしての出力df2
は
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
次に、要件に応じて、ifと一致する場合は削除rows
しdf1
、すべてをに追加します。これらの要件は、以下のように実行できます。OgId
df2
df2
df1
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最終出力は
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
この最終結果は、hdfs
として保存できます。
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
これがお役に立てば幸いです
注:入力位置と出力位置のパスを正しく記述していることを確認してください
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加