展平 _metadata 并从 ES 中提取 Spark scala 中的 _id 键值

安德烈亚

我正在将数据从 ElasticSearch 移动到 HDFS,并使用 databrick 库将它们保存为 avro。我需要将数据展平作为输出,因此我正在应用以下函数:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName))  }  }
  )
}

我的数据:

var df = sql.read.format("es").load("my-index/log").withColumnRenamed("@timestamp", "ts")
val flattened = flattenSchema(df.schema)
val renamed = flattened.map(name => col(name.toString()).as(name.toString().replace(".","_")))
df = df.select(renamed:_*)

直到现在,这都是一种魅力。

Unforunately,现在我要补充_id_metadata来自ES返回。首先,我启用了元数据设置--conf spark.es.read.metadata="true"显然,我的函数不会将地图中的数据展平,而只是将结构展平。我的数据框的架构现在看起来像:

root
 |-- ts: timestamp (nullable = true)
 |-- field_1: string (nullable = true)
 |-- field_2: string (nullable = true)
 |-- field_n: string (nullable = true)
 |-- _metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

如何仅从中提取_id键和相关值_metadata并将其添加到我的df? (也删除_metadata本身,因为我只需要该_id字段)

安德烈亚

对于我的用例,我找到了一个勉强可以接受的解决方法。_id我没有从 ES获取而是在 Spark 中生成一个 UUID,以创建消息的 MD5。这样你就失去了 ES 和 Hadoop 之间的匹配,但是你可以在 UUID 本身的假设上做一些分析。我提出了这个“部分”解决方案,因为它可能会帮助一些未来的谷歌员工:

import java.security.MessageDigest

val md5 = udf((string: String) => {MessageDigest.getInstance("MD5").digest(string.getBytes).map("%02X".format(_)).mkString + scala.util.Random.alphanumeric.take(10).mkString })

df = df.withColumn("uuid",md5(col("message")))

我还在生成字符串时添加了一些盐(一个 10 个字符的随机字符串),只是为了确保减少名称冲突,如果我很不幸地使用相同的时间戳两次使用相同的日志。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使用Spark / Scala展平集合?

来自分类Dev

展平Scala Spark数据框中的最深层次

来自分类Dev

在Spark中的数据框内展平数组

来自分类Dev

展平Scala中的元组列表?

来自分类Dev

从Python中的字典列表中提取键值对

来自分类Dev

使用Spark和Scala展平json文件

来自分类Dev

Scala/Spark:仅使用 RDD 函数将 DataFrame 展平

来自分类Dev

如何使用Spark的地图转换在Scala中返回多个键值对?

来自分类Dev

如何使用Spark的map转换在Scala中返回多个键值对?

来自分类Dev

如何在 Spark/Scala 数据导入中处理嵌套的键值对

来自分类Dev

如何在Spark数据帧中展平结构?

来自分类Dev

在Spark SQL中自动优雅地展平DataFrame

来自分类Dev

在Spark DataFrame中展平嵌套数组

来自分类Dev

是否可以在scala中展平这个特定序列?

来自分类Dev

讨论Scala中的列表展平器功能的实现

来自分类Dev

Spark中的键值对顺序

来自分类Dev

Spark 中的聚合键值对

来自分类Dev

寻找格式文本中的问题,并从中提取元素

来自分类Dev

在Clojure中展平具有ID的嵌套实体

来自分类Dev

从Pandas列中的json数据中提取键值计数

来自分类Dev

从键值对中提取数据到Google Bigquery中的列标题

来自分类Dev

使用python从.txt文件中的键值对中提取值

来自分类Dev

从每个都在大括号中的键值对中提取

来自分类Dev

从字典中提取键值

来自分类Dev

从键值对中提取键

来自分类Dev

jq:嵌套对象,提取顶级ID并从内部对象中提取一个值

来自分类Dev

从Scala中的Some中提取字段

来自分类Dev

从Scala中的Some中提取字段

来自分类Dev

从Scala中的列表中提取元组