여러 주제 및 테이블 대상으로 kafka 싱크 연결을 배포하는 방법

요사 파트 빈센트 사라 기

내에서 이전 질문 , 내가 분산 카프카와 데이터베이스의 실시간 동기화에 대한 소비자 배포에 대한 자세한 동의로 결정했다. 같은 경우 PostgreSQL에서 SQL Server로 가져 오려는 테이블이 수백 개가 넘습니다. PostgreSQL에서 Kafka까지 저는 wal2json 플러그인과 함께 Debezium 커넥터를 사용했습니다. 그리고 Kafka에서 SQL Server까지 JDBC 커넥터를 사용합니다. 3 개의 동일한 설정 브로커 (다른 주소)가 있습니다.

broker.id=0
broker.rack=1
port=9093
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dir=/home/admin/kafka/tmp/kafka_log1
offsets.topic.num.partition=1
offsets.topic.replication.factor=3
min.isnyc.replicas=2
default.replication.factor=3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=7200000
delete.topic.enable=true
message.max.bytes=50497182 
replica.fetch.max.bytes=50497182
group.max.session.timeout.ms=7200000

다음과 같은 몇 가지 가능한 해결책을 시도했습니다.

  1. 1 개의 파티션과 3 개의 복제본을 사용하도록 주제를 설정합니다. 내 테이블 _에 이름 이 있기 때문에 경고를받습니다.
kafka-topics.sh -create --bootstrap-server localhost:9093,localhost:9094,localhost:9095  --replication-factor 3 --partitions 1 --topic $topic_name --config retention.ms=5400000
  1. 나는 서로 다른 작업자로 debezium과 jdbc 커넥터를 분리합니다. 다음과 같이 동일한 구성 (호스트 포트 제외, debezium의 경우 8085, 싱크의 경우 8084)을 가진 두 명의 작업자가 있습니다.
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=debezium-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets-debezium
offset.storage.replication.factor=3
config.storage.topic=connect-configs-debezium
status.storage.topic=connect-status-debezium
producer.buffer.memory=29999999
producer.max.buffered.records=19999999
producer.max.request.size=51497182 
producer.retries=100
producer.max.in.flight.requests.per.connection=1
producer.request.timeout.ms=20000
producer.enable.idempotence=true
producer.retry.backoff.ms=500
producer.send.buffer.bytes=50497182
producer.receive.buffer.bytes=50497182
producer.ack=1
offset.flush.timeout.ms=300000
producer.buffer.memory=51497182
consumer.enable.auto.commit=true
consumer.retries=100
consumer.auto.commit.interval.ms=100000
consumer.max.partition.fetch.bytes=50497182
consumer.max.poll.records=10000
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=50000
consumer.session.timeout.ms=50000
consumer.auto.offset.reset=latest
consumer.isolation.level=read_committed
consumer.max.poll.interval.ms=5400000
fetch_max_bytes=50497182
rest.port=8085
plugin.path=/home/admin/kafka/connectors
  1. 다음없이 하나씩 루프 싱크 커넥터 :
#!/bin/bash
CSV_LIST="/home/admin/kafka/main/config/tables/table_lists.csv"
DATA=${CSV_LIST}

while IFS=',' read table pk mode; do
topic_name=${table} 
curl -X POST http://localhost:8084/connectors -H 'Content-Type:application/json' -d '{"name" :"sqlservercon_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "topics":"'$table'",
            "connection.url":"jdbc:sqlserver://-:1433",
            "connection.user":"-",
            "connection.password":"-",
            "transforms":"unwrap",
            "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones":"false",
            "auto.create":"true",
            "insert.mode":"'$mode'",
            "pk.fields":" '$pk'",
            "pk.mode":"record_value",
            "destination.table.format":"db.dbo.'$table'"
}}' | jq
done < ${DATA}

배포 방법은 다음과 같습니다.

  1. zookeeper 및 kafka 서버 시작
  2. 주제 만들기
  3. Debezium 소스에 대한 kafka 작업자 시작
  4. debezium 커넥터 추가 (1db에는 하나의 커넥터 만 필요하므로)
  5. 싱크대를 위해 kafka 작업자 시작
  6. 루프로 JDBC 커넥터 추가

안타깝게도 몇 가지 교착 상태와 소비자의 인식이 없기 때문에 모든 데이터를 새 SQL Server 데이터베이스로 이동하는 것이 여전히 만족스럽지 않습니다. 최적의 소비자 배포를위한 좋은 제안이 있는지 알고 싶습니다. 각 커넥터에 대해 한 명의 작업자를 추가해야합니까? 아니면 각 주제간에 전환하는 것과 같은 작업을 수행해야합니까?

요사 파트 빈센트 사라 기

Kafka connect jdbc가 batch.record를 SQL 서버로 보내야 할 레코드 수에 사용한다고 생각하는지 확인했습니다. 큰 크기의 레코드로 upsert를 사용할 때 문제가있는 것 같습니다. 소스와 싱크 모두에서 배치를 1로 줄여야한다고 가정합니다. 이것은 여전히 ​​예비 답변입니다. 또한 누군가가 Kafka connect JDBC에 삽입하는 데 사용되는 SQL 쿼리를 표시하는 방법을 알고 있다면 JDBC 동작에 대한 메커니즘과 교착 상태를 해결하는 방법을 배우는 것이 도움이 될 것입니다.

그리고 내 경험과는 거리가 먼 모범 사례는 대상 DB가 있지만 내부에 테이블이없는 경우 먼저 삽입해야하는 테이블의 우선 순위를 정하고 완료 될 때까지 기다렸다가 삽입을 사용하지 않는 것입니다. 테이블의 경우 100000 개 미만의 행을 하나의 그룹으로 그룹화 할 수 있지만 큰 차원 테이블은 별도로 가져와야합니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

여러 테이블을 단일 쿼리로 연결하는 방법

분류에서Dev

SQL : 기본 테이블의 행 값을 기반으로 여러 테이블을 단일 테이블에 조건부로 연결하는 가장 좋은 방법

분류에서Dev

연결 및 피벗 테이블을 사용하여 Pandas 데이터 프레임의 데이터를 특정 형식으로 표현하는 방법

분류에서Dev

MySQL에서 여러 테이블을 가로로 단순히 연결하는 방법은 무엇입니까?

분류에서Dev

Kafka에 연결된 Spring Cloud Stream을 사용하여 동일한 주제에 대한 여러 StreamListeners

분류에서Dev

spree를 사용하여 주문 및 결제없이 상점을 생성하는 가장 간단한 방법

분류에서Dev

다른 테이블로 연결되는 테이블을 사용하여 앱용 MVC를 구현하는 방법

분류에서Dev

SQLAlchemy에서 여러 테이블을 삭제하는 방법

분류에서Dev

kafka-python을 사용하여 동적으로 Kafka에 존재하지 않는 주제를 만드는 방법

분류에서Dev

품목이 포함 된 주문을 삭제 한 후 수량 및 소비 필드를 품목 테이블의 원래 번호로 복원하는 방법

분류에서Dev

Google 시트-여러 값을 여러 링크로 연결하는 방법

분류에서Dev

입력 필드를 기반으로 kafka의 여러 주제에 json 데이터 스트림을 보내는 방법

분류에서Dev

테이블을 연결하는 여러 외래 키 경로

분류에서Dev

android Room, ID 목록으로 테이블의 여러 행을 삭제하는 방법

분류에서Dev

R의 URL 링크에서 각 주제에 대한 테이블을 추출하는 방법-Webscraping

분류에서Dev

블루투스 연결을 열고 A2DP 소스 및 싱크가되는 방법

분류에서Dev

여러 범위와의 연결을 통해 has_many에서 동일한 테이블의 여러 조인을 제거하는 방법은 무엇입니까?

분류에서Dev

JavaScript로 테이블을 대상으로하는 CSS 요소를 제거하는 방법

분류에서Dev

테이블 및 헤더의 여러 열에 걸쳐 DT 데이터 테이블 셀을 확장하는 방법

분류에서Dev

XSLT 1.0을 사용하여 테이블 만들기 및 여러 값 연결

분류에서Dev

Kafka-Streams 상태 저장소를 여러 디스크에 배포하는 방법

분류에서Dev

"관련 제품"테이블을 연결하는 방법

분류에서Dev

MySQL-여러 연결이 주어지면 여러 레코드에 대한 최신 연결을 얻는 방법은 무엇입니까?

분류에서Dev

연결된 테이블의 값을 기반으로 여러 열을 업데이트하는 방법은 무엇입니까?

분류에서Dev

어떻게 두 범주 형 변수에 대한 비상 테이블 (크로스 탭)을 만드는 방법?

분류에서Dev

주어진 항목 ID로 여러 사진을 테이블에 저장하는 방법은 무엇입니까?

분류에서Dev

이번 분기에 주문하지 않은 분기 및 고객으로 테이블을 만드는 방법-MySQL?

분류에서Dev

다 대다 연결 테이블을 정의하는 방법

분류에서Dev

배열을 여러 숫자로 연결하는 방법

Related 관련 기사

  1. 1

    여러 테이블을 단일 쿼리로 연결하는 방법

  2. 2

    SQL : 기본 테이블의 행 값을 기반으로 여러 테이블을 단일 테이블에 조건부로 연결하는 가장 좋은 방법

  3. 3

    연결 및 피벗 테이블을 사용하여 Pandas 데이터 프레임의 데이터를 특정 형식으로 표현하는 방법

  4. 4

    MySQL에서 여러 테이블을 가로로 단순히 연결하는 방법은 무엇입니까?

  5. 5

    Kafka에 연결된 Spring Cloud Stream을 사용하여 동일한 주제에 대한 여러 StreamListeners

  6. 6

    spree를 사용하여 주문 및 결제없이 상점을 생성하는 가장 간단한 방법

  7. 7

    다른 테이블로 연결되는 테이블을 사용하여 앱용 MVC를 구현하는 방법

  8. 8

    SQLAlchemy에서 여러 테이블을 삭제하는 방법

  9. 9

    kafka-python을 사용하여 동적으로 Kafka에 존재하지 않는 주제를 만드는 방법

  10. 10

    품목이 포함 된 주문을 삭제 한 후 수량 및 소비 필드를 품목 테이블의 원래 번호로 복원하는 방법

  11. 11

    Google 시트-여러 값을 여러 링크로 연결하는 방법

  12. 12

    입력 필드를 기반으로 kafka의 여러 주제에 json 데이터 스트림을 보내는 방법

  13. 13

    테이블을 연결하는 여러 외래 키 경로

  14. 14

    android Room, ID 목록으로 테이블의 여러 행을 삭제하는 방법

  15. 15

    R의 URL 링크에서 각 주제에 대한 테이블을 추출하는 방법-Webscraping

  16. 16

    블루투스 연결을 열고 A2DP 소스 및 싱크가되는 방법

  17. 17

    여러 범위와의 연결을 통해 has_many에서 동일한 테이블의 여러 조인을 제거하는 방법은 무엇입니까?

  18. 18

    JavaScript로 테이블을 대상으로하는 CSS 요소를 제거하는 방법

  19. 19

    테이블 및 헤더의 여러 열에 걸쳐 DT 데이터 테이블 셀을 확장하는 방법

  20. 20

    XSLT 1.0을 사용하여 테이블 만들기 및 여러 값 연결

  21. 21

    Kafka-Streams 상태 저장소를 여러 디스크에 배포하는 방법

  22. 22

    "관련 제품"테이블을 연결하는 방법

  23. 23

    MySQL-여러 연결이 주어지면 여러 레코드에 대한 최신 연결을 얻는 방법은 무엇입니까?

  24. 24

    연결된 테이블의 값을 기반으로 여러 열을 업데이트하는 방법은 무엇입니까?

  25. 25

    어떻게 두 범주 형 변수에 대한 비상 테이블 (크로스 탭)을 만드는 방법?

  26. 26

    주어진 항목 ID로 여러 사진을 테이블에 저장하는 방법은 무엇입니까?

  27. 27

    이번 분기에 주문하지 않은 분기 및 고객으로 테이블을 만드는 방법-MySQL?

  28. 28

    다 대다 연결 테이블을 정의하는 방법

  29. 29

    배열을 여러 숫자로 연결하는 방법

뜨겁다태그

보관