假设我想制作一个 spark UDF 来反转结构数组的顺序。结构的具体类型应该无关紧要,所以我尝试了:
val reverseUDF = udf((s:Seq[_]) => s.reverse)
但这给
java.lang.UnsupportedOperationException: Schema for type Any is not supported
我还尝试使用泛型方法并强制类型泛型类型参数成为以下类型的子类型Product
:
def reverse[T <: Product](s:Seq[T]) = {
s.reverse
}
val reverseUDF = udf(reverse _)
这给出:
scala.MatchError: Nothing (of class scala.reflect.internal.Types$TypeRef$$anon$6)
那么这甚至可能吗?
它不是。Spark 必须知道返回输出类型,并且无法使用 SQL 表达式来确定它。您必须udf
为要使用的每种类型定义特定的,例如:
udf(reverse[(String, Int)] _)
udf(reverse[(String, Long, String)] _)
等等。然而,这些在实践中都没有用,因为你永远不会Product
在你的 udf 中看到类型。结构类型始终编码为Row
- Spark Sql UDF 带有复杂的输入参数。
如果您使用 Spark 2.3,您可以将任意表达reverse
为:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType
def reverse(schema: DataType) = udf(
(xs: Seq[Row]) => xs.map(x => Row.fromSeq(x.toSeq.reverse)),
schema
)
但您必须为每个实例提供架构:
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句