Spark2.3.0とScala2.11 こちらAggregator
のドキュメントに従ってカスタムを実装しています。アグリゲーターには、入力、バッファー、出力の3つのタイプが必要です。
私のアグリゲーターはウィンドウ内の前のすべての行に作用する必要があるため、次のように宣言しました。
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
オーバーライドメソッドの1つは、バッファータイプ(この場合はListBuffer
。)のエンコーダーを返すことになっています。org.apache.spark.sql.Encoders
これをエンコードするのに適したエンコーダーや他の方法が見つからないため、ここに何を返すかわかりません。
タイプの単一のプロパティを持つ新しいケースクラスを作成し、ListBuffer[Foo]
それをバッファクラスとして使用Encoders.product
して、それを使用することを考えましたが、それが必要かどうか、または他に何か足りないものがあるかどうかはわかりません。ヒントをありがとう。
Spark SQLにその作業を任せExpressionEncoder
、次のように使用して適切なエンコーダーを見つける必要があります。
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加