JDBC 커넥터를 사용하여 MySQL에서 Kafka로 데이터를 이동하고 있습니다. 내가 관심있는 데이터는 3 개의 테이블을 조인하는 선택에서 가져온 것이므로 mode:incrementing
및 query
다음을 사용 하여 커넥터를 구성했습니다 .
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}
커넥터 상태를 확인할 때 실행 중입니다.
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}
그러나 한 시간 후에도 여전히 생성 된 주제를 볼 수 없습니다. 어떤 쿼리가 실행 중인지 MySQL에서 확인했으며 show full processlist;
다음과 같은 두 가지 쿼리가 표시됩니다.
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
따라서 기본적으로 쿼리는 query
커넥터 구성에 제공 한 쿼리와 동일 WHERE s.id > -1 ORDER BY s.id ASC
합니다.이 조인의 쿼리는 MySQL이 오랫동안 데이터를 보내는 2 천 1 백만 행의 결과 집합을 생성하기 때문입니다. 다시 확인하면 show full processlist;
다음과 같은 4 개의 쿼리가 표시되고 8 개, 16 개 등이 표시됩니다.
질문은 다음과 같습니다.
s.id > -1 ORDER BY s.id ASC
."batch.max.rows": "100"
초기 폴링 후에 배치 크기 만 제어하고 있습니까 ??최신 정보:
이 문제에 대한 공개 주제가 있습니다. 이 질문은 끝날 수 있다고 생각합니다.
를 사용 incrementing
mode
하고 전달 된 JDBC 소스 커넥터는 query
다음 where 절을 사용하여 해당 쿼리를 실행합니다 WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC
.. (증분 모드와 쿼리를 사용하는 경우 where
거기에 절을 전달할 수 없습니다 ).
첫 번째 폴 lastIncrementedValue
은 -1이므로 모든 레코드를 쿼리합니다. 각 레코드를 추출한 후 lastIncrementedValue가 증가하므로 다음 쿼리는 새 데이터 만 폴링합니다. Kafka Connect 프레임 워크로 반환 할 batch.max.rows
레코드 수를 나타냅니다 SourceTask::poll(...)
. Kafka에 한 번에 전송되는 배치의 최대 크기입니다.
단일 테이블에서 데이터를 가져올 때 쿼리가 더 빠르게 실행되기 때문에 더 빠르게 작동한다고 생각합니다 (덜 복잡함). 다른 SQL 도구를 사용하여 이러한 쿼리를 실행하면 유사한 작업이 수행됩니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다