我有用逗号分隔符指定的文件集合,例如:
hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22
我正在使用 Apache Spark 加载文件,全部使用:
val input = sc.textFile(files)
此外,我还有与每个文件相关联的附加信息 - 唯一 ID,例如:
File ID
--------------------------------------------------
hdfs://user/cloudera/date=2018-01-15 | 12345
hdfs://user/cloudera/date=2018-01-16 | 09245
hdfs://user/cloudera/date=2018-01-17 | 345hqw4
and so on
作为输出,我需要接收带有行的 DataFrame,其中每一行将包含相同的 ID,作为从中读取该行的文件的 ID。
是否可以以某种方式将此信息传递给 Spark 以便能够与行关联?
核心 sql 方法UDF
(join
如果将 File -> ID 映射表示为 Dataframe ,则可以实现相同的目标):
import org.apache.spark.sql.functions
val inputDf = sparkSession.read.text(".../src/test/resources/test")
.withColumn("fileName", functions.input_file_name())
def withId(mapping: Map[String, String]) = functions.udf(
(file: String) => mapping.get(file)
)
val mapping = Map(
"file:///.../src/test/resources/test/test1.txt" -> "id1",
"file:///.../src/test/resources/test/test2.txt" -> "id2"
)
val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
resutlDf.show(false)
结果:
+-----+---------------------------------------------+---+
|value|fileName |id |
+-----+---------------------------------------------+---+
|row1 |file:///.../src/test/resources/test/test1.txt|id1|
|row11|file:///.../src/test/resources/test/test1.txt|id1|
|row2 |file:///.../src/test/resources/test/test2.txt|id2|
|row22|file:///.../src/test/resources/test/test2.txt|id2|
+-----+---------------------------------------------+---+
文本1.txt:
row1
row11
文本2.txt:
row2
row22
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句