私はConfluentプラットフォーム(Kafka、Ksqlなど)を学習中です。Kafka ConnectでDebeziumを使用して、Kafkaトピックにデータをストリーミングしています。データベーステーブル「log」のフィールドの1つは「register」と呼ばれ、レコードが追加されたときのタイムスタンプです。
参考までに、(ソースMySQLデータベースの)テーブルログの構造は次のとおりです。
CREATE TABLE `log` (
`code` varchar(9) NOT NULL,
`register` datetime NOT NULL,
`entry` mediumtext NOT NULL,
PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
意図したとおりに機能する次の構成を使用して、2つのデータベースの「ログ」テーブルから単一のKafkaトピックにデータをストリーミングしています。
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",
ソースデータベース(Debeziumによって生成されたメタデータから)とログテーブルのコードフィールド、およびテーブルの残りのフィールドを連結した新しいキーを作成するKSQLストリームを確立しようとしています。これの目的は、派生キーがシンクに送信されたときに完全に一意になるようにすることです(現在、2つのソースデータベースのログテーブルのマージされたコピーである必要がある単一のログテーブルを含む別のMySQLデータベースに接続しています)
私が実行しようとしているクエリは次のとおりです。
SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
ただし、次のエラーが発生します。
line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
「登録」がある種の予約用語であることを示唆するところはどこにも見当たりません。
誰か助けてもらえますか?代替案は、ソースデータベース名に到達する必要があるため、Debeziumによって生成されたメッセージをフラット化できないことを念頭に置いて、変換を使用して途中でフィールド名を変更する方法を提案できます。
はいREGISTER
は予約語です。DDLでは避ける必要があります。あなたはそれを引用することによってそれにアクセスすることができるかもしれません、試す価値があります。
フィールドを削除するための単一メッセージ変換がありますが、ネストされたデータでは機能しません。あなたが試すことができるのはUnwrapFromEnvelope
、フィールドの名前を変更するために1つと組み合わせたSMTです。私はこの設定を試していませんが、
"transforms": "unwrap,renameField",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "register:notareservedword",
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加