Spark 批量加载文件集合,并从每个文件中找到包含文件级别附加信息的行

类二烯类

我有用逗号分隔符指定的文件集合,例如:

hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22

我正在使用 Apache Spark 加载文件,全部使用:

val input = sc.textFile(files)

此外,我还有与每个文件相关联的附加信息 - 唯一 ID,例如:

File                                     ID
--------------------------------------------------
hdfs://user/cloudera/date=2018-01-15  | 12345
hdfs://user/cloudera/date=2018-01-16  | 09245
hdfs://user/cloudera/date=2018-01-17  | 345hqw4
and so on

作为输出,我需要接收带有行的 DataFrame,其中每一行将包含相同的 ID,作为从中读取该行的文件的 ID。

是否可以以某种方式将此信息传递给 Spark 以便能够与行关联?

莫西克

核心 sql 方法UDFjoin如果将 File -> ID 映射表示为 Dataframe 可以实现相同的目标):

import org.apache.spark.sql.functions

val inputDf = sparkSession.read.text(".../src/test/resources/test")
    .withColumn("fileName", functions.input_file_name())

def withId(mapping: Map[String, String]) = functions.udf(
  (file: String) => mapping.get(file)
)

val mapping = Map(
  "file:///.../src/test/resources/test/test1.txt" -> "id1",
  "file:///.../src/test/resources/test/test2.txt" -> "id2"
)

val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
resutlDf.show(false)

结果:

+-----+---------------------------------------------+---+
|value|fileName                                     |id |
+-----+---------------------------------------------+---+
|row1 |file:///.../src/test/resources/test/test1.txt|id1|
|row11|file:///.../src/test/resources/test/test1.txt|id1|
|row2 |file:///.../src/test/resources/test/test2.txt|id2|
|row22|file:///.../src/test/resources/test/test2.txt|id2|
+-----+---------------------------------------------+---+

文本1.txt:

row1
row11

文本2.txt:

row2
row22

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spark Standalone 加载文件失败

来自分类Dev

无法从Spark Dataframe中的HDFS加载文件

来自分类Dev

使用Spark Submit从Linux FS加载文件

来自分类Dev

Spark:如何从Spark Shell运行Spark文件

来自分类Dev

如何使用Java Spark下载文件?

来自分类Dev

Apache Spark:从本地加载文件而不是 HDFS 并加载本地文件给出 IllegalArguementException

来自分类Dev

获取使用Spark加载的文件的详细信息

来自分类Dev

在Spark(Scala)中绕过每个文件的第一行

来自分类Dev

绕过Spark(Scala)中每个文件的最后一行

来自分类Dev

Spark的Gzip文件

来自分类Dev

SPARK读取SEQUENCE文件

来自分类Dev

Apache Spark Dataframe - 从 CSV 文件的第 n 行加载数据

来自分类Dev

从Yaml文件加载Java Spark配置

来自分类Dev

apache spark加载内部文件夹

来自分类Dev

从S3存储桶加载文件时,Spark创建多少个分区?

来自分类Dev

使用Scala / Spark列出目录中的文件(包括文件信息)

来自分类Dev

spark + sbt-assembly:“重复数据删除:以下中找到不同的文件内容”

来自分类Dev

Jekyll无法在集合中找到包含文件

来自分类Dev

在> 2个文件的特定列中找到通用元素,并从每个文件中打印相应的行

来自分类Dev

从Spark中的多个文件夹加载多个文件

来自分类Dev

Spark SQL中的Parquet文件

来自分类Dev

Apache Spark的常规文件行为

来自分类Dev

在Spark中导入TSV文件

来自分类Dev

Scala用Spark读取文件

来自分类Dev

在Apache Spark中写入文件

来自分类Dev

Scala用Spark读取文件

来自分类Dev

Spark 删除 Apache Orc 文件

来自分类Dev

构建 Scala JAR 文件 Spark

来自分类Dev

Spark DataFrame 到 xml 文件