这是我的数据集:
Dataset<Row> myResult = pot.select(col("number")
, col("document")
, explode(col("mask")).as("mask"));
我现在需要从现有的myResult创建一个新的数据集。如下所示:
Dataset<Row> myResultNew = myResult.select(col("number")
, col("name")
, col("age")
, col("class")
, col("mask");
名称,年龄和类别是从Dataset myResult的列文档中创建的。我想我可以在列文档上调用函数,然后对它执行任何操作。
myResult.select(extract(col("document")));
private String extract(final Column document) {
//TODO ADD A NEW COLUMN nam, age, class TO THE NEW DATASET.
// PARSE DOCUMENT AND GET THEM.
XMLParser doc= (XMLParser) document // this doesnt work???????
}
我的问题是:文档的类型为column,我需要将其转换为其他Object Type并对其进行解析以提取name,age,class。我怎样才能做到这一点。document是xml,我需要进行解析以获取其他3列,因此不能避免将其转换为XML。
将extract
方法转换为UDF是一个尽可能接近您所要求的解决方案。UDF可以采用一列或多列的值,并使用此输入执行任何逻辑。
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
[...]
UserDefinedFunction extract = udf(
(String document) -> {
List<String> result = new ArrayList<>();
XMLParser doc = XMLParser.parse(document);
String name = ... //read name from xml document
String age = ... //read age from xml document
String clazz = ... //read class from xml document
result.add(name);
result.add(age);
result.add(clazz);
return result;
}, DataTypes.createArrayType(DataTypes.StringType)
);
UDF的限制是它们只能返回一列。因此,该函数返回一个String数组,此后必须将其解压缩。
Dataset<Row> myResultNew = myResult
.withColumn("extract", extract.apply(col("document"))) //1
.withColumn("name", col("extract").getItem(0)) //2
.withColumn("age", col("extract").getItem(1)) //2
.withColumn("class", col("extract").getItem(2)) //2
.drop("document", "extract"); //3
apply
函数的参数注意:udf在数据集中的每行执行一次。如果xml解析器的创建非常昂贵,这可能会减慢Spark作业的执行,因为每行实例化一个解析器。由于Spark的并行性质,无法将解析器重新用于下一行。如果这是一个问题,则另一个选择(至少在Java世界中稍微复杂一些)是使用mapPartitions。这里,每行不需要一个解析器,而数据集的每个分区只需要一个解析器。
完全不同的方法是使用spark-xml。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句