내에서 이전 질문 , 내가 분산 카프카와 데이터베이스의 실시간 동기화에 대한 소비자 배포에 대한 자세한 동의로 결정했다. 같은 경우 PostgreSQL에서 SQL Server로 가져 오려는 테이블이 수백 개가 넘습니다. PostgreSQL에서 Kafka까지 저는 wal2json 플러그인과 함께 Debezium 커넥터를 사용했습니다. 그리고 Kafka에서 SQL Server까지 JDBC 커넥터를 사용합니다. 3 개의 동일한 설정 브로커 (다른 주소)가 있습니다.
broker.id=0
broker.rack=1
port=9093
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dir=/home/admin/kafka/tmp/kafka_log1
offsets.topic.num.partition=1
offsets.topic.replication.factor=3
min.isnyc.replicas=2
default.replication.factor=3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=7200000
delete.topic.enable=true
message.max.bytes=50497182
replica.fetch.max.bytes=50497182
group.max.session.timeout.ms=7200000
다음과 같은 몇 가지 가능한 해결책을 시도했습니다.
_
에 이름 이 있기 때문에 경고를받습니다.kafka-topics.sh -create --bootstrap-server localhost:9093,localhost:9094,localhost:9095 --replication-factor 3 --partitions 1 --topic $topic_name --config retention.ms=5400000
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=debezium-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets-debezium
offset.storage.replication.factor=3
config.storage.topic=connect-configs-debezium
status.storage.topic=connect-status-debezium
producer.buffer.memory=29999999
producer.max.buffered.records=19999999
producer.max.request.size=51497182
producer.retries=100
producer.max.in.flight.requests.per.connection=1
producer.request.timeout.ms=20000
producer.enable.idempotence=true
producer.retry.backoff.ms=500
producer.send.buffer.bytes=50497182
producer.receive.buffer.bytes=50497182
producer.ack=1
offset.flush.timeout.ms=300000
producer.buffer.memory=51497182
consumer.enable.auto.commit=true
consumer.retries=100
consumer.auto.commit.interval.ms=100000
consumer.max.partition.fetch.bytes=50497182
consumer.max.poll.records=10000
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=50000
consumer.session.timeout.ms=50000
consumer.auto.offset.reset=latest
consumer.isolation.level=read_committed
consumer.max.poll.interval.ms=5400000
fetch_max_bytes=50497182
rest.port=8085
plugin.path=/home/admin/kafka/connectors
#!/bin/bash
CSV_LIST="/home/admin/kafka/main/config/tables/table_lists.csv"
DATA=${CSV_LIST}
while IFS=',' read table pk mode; do
topic_name=${table}
curl -X POST http://localhost:8084/connectors -H 'Content-Type:application/json' -d '{"name" :"sqlservercon_'$topic_name'",
"config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"topics":"'$table'",
"connection.url":"jdbc:sqlserver://-:1433",
"connection.user":"-",
"connection.password":"-",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"auto.create":"true",
"insert.mode":"'$mode'",
"pk.fields":" '$pk'",
"pk.mode":"record_value",
"destination.table.format":"db.dbo.'$table'"
}}' | jq
done < ${DATA}
배포 방법은 다음과 같습니다.
안타깝게도 몇 가지 교착 상태와 소비자의 인식이 없기 때문에 모든 데이터를 새 SQL Server 데이터베이스로 이동하는 것이 여전히 만족스럽지 않습니다. 최적의 소비자 배포를위한 좋은 제안이 있는지 알고 싶습니다. 각 커넥터에 대해 한 명의 작업자를 추가해야합니까? 아니면 각 주제간에 전환하는 것과 같은 작업을 수행해야합니까?
Kafka connect jdbc가 batch.record를 SQL 서버로 보내야 할 레코드 수에 사용한다고 생각하는지 확인했습니다. 큰 크기의 레코드로 upsert를 사용할 때 문제가있는 것 같습니다. 소스와 싱크 모두에서 배치를 1로 줄여야한다고 가정합니다. 이것은 여전히 예비 답변입니다. 또한 누군가가 Kafka connect JDBC에 삽입하는 데 사용되는 SQL 쿼리를 표시하는 방법을 알고 있다면 JDBC 동작에 대한 메커니즘과 교착 상태를 해결하는 방법을 배우는 것이 도움이 될 것입니다.
그리고 내 경험과는 거리가 먼 모범 사례는 대상 DB가 있지만 내부에 테이블이없는 경우 먼저 삽입해야하는 테이블의 우선 순위를 정하고 완료 될 때까지 기다렸다가 삽입을 사용하지 않는 것입니다. 테이블의 경우 100000 개 미만의 행을 하나의 그룹으로 그룹화 할 수 있지만 큰 차원 테이블은 별도로 가져와야합니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다