cuontryに基づいてすべての行をどのようにカテゴリ化すると、spark scalaで同じ形式でrddに保存されますか?

スワティT

私はこのようなデータを持っています

vxbjxvsj^country:US;age:23;name:sri
jhddasjd^country:UK;age:24;name:abhi
vxbjxvsj^country:US;age:23;name:shree
jhddasjd^country:UK;age:;name:david

Spark Scalaでは、国別保存で分類された国を識別する必要があります。rddと同じ形式が1つのrddまたはファイル名である必要があります。

jhddasjd^country:UK;age:24;name:abhi
jhddasjd^country:UK;age:;name:david

1つのrddまたはファイル名UKである必要があります

vxbjxvsj^country:US;age:23;name:sri
vxbjxvsj^country:US;age:23;name:shree

ファイルをとして読み取ると、文字列のrddRDDが取得さRDD[String]れ、各行が文字列として取得されます。

filterあなたに必要なsplit各ラインとそれに国フィールドとフィルタを抽出

rdd.filter(r =>
      r.split(":")(1).split(";")(0).equalsIgnoreCase("US")
    ).saveAsTextFile(s"US"}")

これにより、国フィールドとフィルターが取得されます。 "US"

これを動的にしたい場合は、最初に一意の国のリストを取得して、次のようにループでフィルターを実行できます。

val countries = df1.map(_.split(":")).map(_ (1).split(";")(0)).collect()

countries.foreach(country => {
  rdd.filter(r =>
    r.split(":")(1).split(";")(0).equalsIgnoreCase(country)
  ).saveAsTextFile(s"output/${country}")
})

お役に立てれば!

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ