kafka stream / Ksqlは実際にjsonをネイティブにサポートしていますか?サポートされている他のフォーマットは何ですか?フラットなjsonをテーブルとして解釈することが可能であることがわかりました。その部分をもう少しよく理解したいと思います。SQLを介してクエリできるKsqlを介してkafka-streamsする他の形式は何ですか?それはどのように可能またはサポートされていますか?ネイティブサポートとは何ですか?
KSQL
値形式の場合、KSQLはAVRO、JSON、およびDELIMITED(CSVなど)をサポートします。
ここでドキュメントを見つけることができます:
カフカストリーム
Kafka Streamsには、org.apache.kafka.common.serialization
パッケージの下にいくつかのプリミティブ/基本的なSerDes(Serializers / Deserializers)が付属しています。
ここでドキュメントを見つけることができます:
Confluentは、汎用Avroおよび特定のAvro形式のデータ用にスキーマレジストリ互換のAvroSerDesも提供します。ここでドキュメントを見つけることができます:
例に付属しているJSONの基本的なSerDe実装を使用することもできます。
最後の手段として、いつでも独自のカスタムSerDesを作成できます。そのためには、次のことを行う必要があります。
T
実装して、データ型のシリアライザーを記述しますorg.apache.kafka.common.serialization.Serializer
。T
実装して、のデシリアライザーを記述しますorg.apache.kafka.common.serialization.Deserializer
。T
実装してorg.apache.kafka.common.serialization.Serde
、手動で(前のセクションの既存のSerDesを参照)、またはなどのSerdesのヘルパー関数を利用してSerdeを記述しますSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
。KafkaStreamsに提供された構成でカスタムserdeを使用する場合は、独自のクラス(汎用タイプを持たない)を実装する必要があることに注意してください。Serdeクラスにジェネリック型がある場合Serdes.serdeFrom(Serializer<T>, Deserializer<T>)
、またはを使用する場合は、メソッド呼び出しを介してのみSerdeを渡すことができます(たとえばbuilder.stream("topicName", Consumed.with(...))
)。この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加