入力ファイル名を分割し、sparkデータフレーム列に特定の値を追加する方法

user7547751

これは私が私のcsvファイルをsparkデータフレームにロードする方法です

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf



val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))

これは私の入力ファイル名の1つの例です。

Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full

Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt

ここで、このファイルを読み取り、「。」で分割します。演算子を入力し、DataPartitionの代わりにCUSを新しい列として追加します。

UDFなしでそれを行うことはできますか?

これが既存のデータフレームのスキーマです

root
 |-- LineItem_organizationId: long (nullable = true)
 |-- LineItem_lineItemId: integer (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- LineItemName: string (nullable = true)
 |-- LocalLanguageLabel: string (nullable = true)
 |-- FinancialConceptLocal: string (nullable = true)
 |-- FinancialConceptGlobal: string (nullable = true)
 |-- IsDimensional: boolean (nullable = true)
 |-- InstrumentId: string (nullable = true)
 |-- LineItemSequence: string (nullable = true)
 |-- PhysicalMeasureId: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
 |-- IsRangeAllowed: boolean (nullable = true)
 |-- IsSegmentedByOrigin: boolean (nullable = true)
 |-- SegmentGroupDescription: string (nullable = true)
 |-- SegmentChildDescription: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel: string (nullable = true)
 |-- LocalLanguageLabel_languageId: integer (nullable = true)
 |-- LineItemName_languageId: integer (nullable = true)
 |-- SegmentChildDescription_languageId: integer (nullable = true)
 |-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
 |-- SegmentGroupDescription_languageId: integer (nullable = true)
 |-- SegmentMultipleFundbDescription: string (nullable = true)
 |-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
 |-- IsCredit: boolean (nullable = true)
 |-- FinancialConceptLocalId: integer (nullable = true)
 |-- FinancialConceptGlobalId: integer (nullable = true)
 |-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
 |-- FFAction: string (nullable = true)

提案された回答の後にコードを更新する

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{input_file_name, regexp_extract}

spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))

import org.apache.spark.sql.functions.input_file_name

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)

df1result.withColumn("cus_val", get_cus_val(input_file_name))

df1result.printSchema()
mrsrinivas

事前定義されたUDFを使用してファイル名を取得できます。つまりinput_file_name()、その後、UDFを作成してCUSを抽出するか、または2つのUDFを使用できますregexp_extract

使用してregexp_extractUDFヲ ここに正規表現の使用

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

df.withColumn("cus_val", 
  regexp_extract(input_file_name, "\.(\w+)\.[0-9]+\.", 1))

カスタムUDFの使用

import org.apache.spark.sql.functions.udf

val get_cus_val = udf(filePath: String => filePath.split("\\.")(4))

import org.apache.spark.sql.functions.input_file_name

df.withColumn("cus_val", get_cus_val(input_file_name))

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

PySpark:特定の列のデータフレームに値を入力する方法は?

分類Dev

PySpark:特定の列のデータフレームに値を入力する方法は?

分類Dev

PySpark:特定の列のデータフレームに値を入力する方法は?

分類Dev

Sparkはデータを読み込み、データフレーム列としてファイル名を追加

分類Dev

Sparkはデータを読み込み、データフレーム列としてファイル名を追加

分類Dev

Sparkはデータを読み込み、データフレーム列としてファイル名を追加

分類Dev

Sparkデータフレームのレコードの入力ファイル名を取得する方法は?

分類Dev

Sparkデータフレームに定数値列を追加する

分類Dev

不足している値を特定の値でパンダデータフレームに入力する

分類Dev

Python、データフレームを使用して、列内の文字列の値を分割し、分割した値を使用して新しい列を追加する方法

分類Dev

複数の条件でデータフレームの新しい列に値を入力する方法

分類Dev

データフレームの列値を複数の列に分割する方法

分類Dev

Scala Spark、入力データフレーム、すべての値が1に等しい列を返す

分類Dev

Pythonでjsonファイルからデータフレームに特定のキーを挿入する方法

分類Dev

列に基づいてデータフレームを分割し、特定の名前で複数の分割された.txtファイルを書き出します

分類Dev

Rのデータフレームにファイル名を追加する

分類Dev

ファイル名をpysparkデータフレームの異なる列に分割します

分類Dev

以前の出力値をPythonデータフレームの入力値として追加する方法

分類Dev

データフレームのリストの列にファイル名を追加します

分類Dev

Sparkデータフレームの行を列に分割する方法は?

分類Dev

データフレームに名前を追加し、Rの特定のセルを変更する方法

分類Dev

一意の列値によってデータフレームをグループに分割し、ファイルに保存する方法

分類Dev

データフレームに新しい列を追加し、特定のロジックからの値を行に入力します

分類Dev

ajaxを使用して入力ファイルのデータ値をPHPページに送信する方法

分類Dev

年としてのファイル名である列名をデータフレームに追加します

分類Dev

数値の列を配列データファイルに追加する方法

分類Dev

データフレームで列を分割し、分割値を追加する方法

分類Dev

新しいデータフレームにデータとヘッダーを追加し、ファイル名を列に入力します

分類Dev

Sparkのデータフレームを使用して日付フィールドに値を追加する方法

Related 関連記事

  1. 1

    PySpark:特定の列のデータフレームに値を入力する方法は?

  2. 2

    PySpark:特定の列のデータフレームに値を入力する方法は?

  3. 3

    PySpark:特定の列のデータフレームに値を入力する方法は?

  4. 4

    Sparkはデータを読み込み、データフレーム列としてファイル名を追加

  5. 5

    Sparkはデータを読み込み、データフレーム列としてファイル名を追加

  6. 6

    Sparkはデータを読み込み、データフレーム列としてファイル名を追加

  7. 7

    Sparkデータフレームのレコードの入力ファイル名を取得する方法は?

  8. 8

    Sparkデータフレームに定数値列を追加する

  9. 9

    不足している値を特定の値でパンダデータフレームに入力する

  10. 10

    Python、データフレームを使用して、列内の文字列の値を分割し、分割した値を使用して新しい列を追加する方法

  11. 11

    複数の条件でデータフレームの新しい列に値を入力する方法

  12. 12

    データフレームの列値を複数の列に分割する方法

  13. 13

    Scala Spark、入力データフレーム、すべての値が1に等しい列を返す

  14. 14

    Pythonでjsonファイルからデータフレームに特定のキーを挿入する方法

  15. 15

    列に基づいてデータフレームを分割し、特定の名前で複数の分割された.txtファイルを書き出します

  16. 16

    Rのデータフレームにファイル名を追加する

  17. 17

    ファイル名をpysparkデータフレームの異なる列に分割します

  18. 18

    以前の出力値をPythonデータフレームの入力値として追加する方法

  19. 19

    データフレームのリストの列にファイル名を追加します

  20. 20

    Sparkデータフレームの行を列に分割する方法は?

  21. 21

    データフレームに名前を追加し、Rの特定のセルを変更する方法

  22. 22

    一意の列値によってデータフレームをグループに分割し、ファイルに保存する方法

  23. 23

    データフレームに新しい列を追加し、特定のロジックからの値を行に入力します

  24. 24

    ajaxを使用して入力ファイルのデータ値をPHPページに送信する方法

  25. 25

    年としてのファイル名である列名をデータフレームに追加します

  26. 26

    数値の列を配列データファイルに追加する方法

  27. 27

    データフレームで列を分割し、分割値を追加する方法

  28. 28

    新しいデータフレームにデータとヘッダーを追加し、ファイル名を列に入力します

  29. 29

    Sparkのデータフレームを使用して日付フィールドに値を追加する方法

ホットタグ

アーカイブ