Spark数据集:数据集的转换列

编码器123

这是我的数据集:

  Dataset<Row> myResult = pot.select(col("number")
                    , col("document")
                    , explode(col("mask")).as("mask"));

我现在需要从现有的myResult创建一个新的数据集。如下所示:

  Dataset<Row> myResultNew = myResult.select(col("number")
                , col("name")
                , col("age")
                , col("class")
                , col("mask");

名称,年龄和类别是从Dataset myResult的列文档中创建的。我想我可以在列文档上调用函数,然后对它执行任何操作。

myResult.select(extract(col("document")));


 private String extract(final Column document) {
        //TODO ADD A NEW COLUMN nam, age, class TO THE NEW DATASET.
        // PARSE DOCUMENT AND GET THEM.

     XMLParser doc= (XMLParser) document // this doesnt work???????




} 

我的问题是:文档的类型为column,我需要将其转换为其他Object Type并对其进行解析以提取name,age,class。我怎样才能做到这一点。document是xml,我需要进行解析以获取其他3列,因此不能避免将其转换为XML。

维纳

extract方法转换为UDF是一个尽可能接近您所要求的解决方案。UDF可以采用一列或多列的值,并使用此输入执行任何逻辑。

import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

[...]

UserDefinedFunction extract = udf(
        (String document) -> {
            List<String> result = new ArrayList<>();
            XMLParser doc = XMLParser.parse(document);
            String name = ... //read name from xml document
            String age = ... //read age from xml document
            String clazz = ... //read class from xml document
            result.add(name);
            result.add(age);
            result.add(clazz);
            return result;
         }, DataTypes.createArrayType(DataTypes.StringType)
);

UDF的限制是它们只能返回一列。因此,该函数返回一个String数组,此后必须将其解压缩。

Dataset<Row> myResultNew = myResult
    .withColumn("extract", extract.apply(col("document"))) //1
    .withColumn("name", col("extract").getItem(0))         //2
    .withColumn("age", col("extract").getItem(1))          //2
    .withColumn("class", col("extract").getItem(2))        //2
    .drop("document", "extract");                          //3
  1. 调用UDF并将包含xml文档的列用作该apply函数的参数
  2. 从步骤1的返回数组中创建结果列
  3. 删除中间列

注意:udf在数据集中的每行执行一次。如果xml解析器的创建非常昂贵,这可能会减慢Spark作业的执行,因为每行实例化一个解析器。由于Spark的并行性质,无法将解析器重新用于下一行。如果这是一个问题,则另一个选择(至少在Java世界中稍微复杂一些)是使用mapPartitions这里,每行不需要一个解析器,而数据集的每个分区只需要一个解析器。

完全不同的方法是使用spark-xml

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

向 spark 数据集添加列并转换数据

来自分类Dev

Spark 默认空列数据集

来自分类Dev

将数据集列转换为obsnames

来自分类Dev

将数据集列转换为obsnames

来自分类Dev

XML数据集转换

来自分类Dev

数据集选择列

来自分类Dev

如何将Spark数据集转换为Scala Seq

来自分类Dev

Spark如何将RDD [JSONObject]转换为数据集

来自分类Dev

Spark 无法读取 CSV 文件并转换为数据集

来自分类Dev

spark - 将元组列表转换为数据集 - scala

来自分类Dev

Spark中的内存数据集

来自分类Dev

Spark 数据集上的 GroupbyKey

来自分类Dev

数据集到sqldatareader的转换

来自分类Dev

用条件转换数据集

来自分类Dev

一列小结(在Spark数据集上实现多维数据集函数)

来自分类Dev

如何总结Spark / Scala中数据集的列?

来自分类Dev

如何总结Spark / Scala中数据集的列?

来自分类Dev

python pandas,转换数据集,将行移动到列

来自分类Dev

按列将SAS数据集转换为相对频率

来自分类Dev

SAS:将窄数据集转换为宽数据集

来自分类Dev

动态地将spark数据帧转换为元组的数据集(String,_ <:Product)

来自分类Dev

如何使用数据集的列

来自分类Dev

Scala / Spark-使用来自另一数据集的一列创建数据集

来自分类Dev

如何从RDD创建Spark数据集

来自分类Dev

Spark数据集-内部联接问题

来自分类Dev

Spark数据集过滤器元素

来自分类Dev

使用Spark和Scala膨胀数据集

来自分类Dev

如何从RDD创建Spark数据集

来自分类Dev

Spark 2选项数据集