我正在将数据从 ElasticSearch 移动到 HDFS,并使用 databrick 库将它们保存为 avro。我需要将数据展平作为输出,因此我正在应用以下函数:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }
)
}
我的数据:
var df = sql.read.format("es").load("my-index/log").withColumnRenamed("@timestamp", "ts")
val flattened = flattenSchema(df.schema)
val renamed = flattened.map(name => col(name.toString()).as(name.toString().replace(".","_")))
df = df.select(renamed:_*)
直到现在,这都是一种魅力。
Unforunately,现在我要补充_id
从_metadata
来自ES返回。首先,我启用了元数据设置--conf spark.es.read.metadata="true"
。显然,我的函数不会将地图中的数据展平,而只是将结构展平。我的数据框的架构现在看起来像:
root
|-- ts: timestamp (nullable = true)
|-- field_1: string (nullable = true)
|-- field_2: string (nullable = true)
|-- field_n: string (nullable = true)
|-- _metadata: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
如何仅从中提取_id
键和相关值_metadata
并将其添加到我的df
? (也删除_metadata
本身,因为我只需要该_id
字段)
对于我的用例,我找到了一个勉强可以接受的解决方法。_id
我没有从 ES获取,而是在 Spark 中生成一个 UUID,以创建消息的 MD5。这样你就失去了 ES 和 Hadoop 之间的匹配,但是你可以在 UUID 本身的假设上做一些分析。我提出了这个“部分”解决方案,因为它可能会帮助一些未来的谷歌员工:
import java.security.MessageDigest
val md5 = udf((string: String) => {MessageDigest.getInstance("MD5").digest(string.getBytes).map("%02X".format(_)).mkString + scala.util.Random.alphanumeric.take(10).mkString })
df = df.withColumn("uuid",md5(col("message")))
我还在生成字符串时添加了一些盐(一个 10 个字符的随机字符串),只是为了确保减少名称冲突,如果我很不幸地使用相同的时间戳两次使用相同的日志。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句