Kafka Connect JDBC 커넥터 쿼리 + 초기 폴링에서 대규모 데이터 세트로 증가 모드 초크

Lloiacono

JDBC 커넥터를 사용하여 MySQL에서 Kafka로 데이터를 이동하고 있습니다. 내가 관심있는 데이터는 3 개의 테이블을 조인하는 선택에서 가져온 것이므로 mode:incrementingquery다음을 사용 하여 커넥터를 구성했습니다 .

{
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}

커넥터 상태를 확인할 때 실행 중입니다.

curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}

그러나 한 시간 후에도 여전히 생성 된 주제를 볼 수 없습니다. 어떤 쿼리가 실행 중인지 MySQL에서 확인했으며 show full processlist;다음과 같은 두 가지 쿼리가 표시됩니다.

select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC

따라서 기본적으로 쿼리는 query커넥터 구성에 제공 한 쿼리와 동일 WHERE s.id > -1 ORDER BY s.id ASC합니다.이 조인의 쿼리는 MySQL이 오랫동안 데이터를 보내는 2 천 1 백만 행의 결과 집합을 생성하기 때문입니다. 다시 확인하면 show full processlist;다음과 같은 4 개의 쿼리가 표시되고 8 개, 16 개 등이 표시됩니다.

질문은 다음과 같습니다.

  1. Kafka connect 가 다음을 추가 할 때 한 번에 모든을 가져 오려고하는 이유 : s.id > -1 ORDER BY s.id ASC.
  2. 이 작업을 수행하지 않고 대신 더 적은 양을 가져 오도록 커넥터를 구성 할 수 있습니까?
  3. "batch.max.rows": "100"초기 폴링 후에 배치 크기 만 제어하고 있습니까 ??

최신 정보:

문제에 대한 공개 주제가 있습니다. 이 질문은 끝날 수 있다고 생각합니다.

Bartosz Wardziński

를 사용 incrementing mode하고 전달 된 JDBC 소스 커넥터는 query다음 where 절을 사용하여 해당 쿼리를 실행합니다 WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC.. (증분 모드와 쿼리를 사용하는 경우 where거기에 절을 전달할 수 없습니다 ).

첫 번째 폴 lastIncrementedValue은 -1이므로 모든 레코드를 쿼리합니다. 각 레코드를 추출한 후 lastIncrementedValue가 증가하므로 다음 쿼리는 새 데이터 만 폴링합니다. Kafka Connect 프레임 워크로 반환 할 batch.max.rows레코드 수를 나타냅니다 SourceTask::poll(...). Kafka에 한 번에 전송되는 배치의 최대 크기입니다.

단일 테이블에서 데이터를 가져올 때 쿼리가 더 빠르게 실행되기 때문에 더 빠르게 작동한다고 생각합니다 (덜 복잡함). 다른 SQL 도구를 사용하여 이러한 쿼리를 실행하면 유사한 작업이 수행됩니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Kafka Connect에서 JDBC 싱크 커넥터를 사용하여 레코드를 삭제할 수없는 이유

분류에서Dev

데이터 기반 테스트로 대규모 데이터 세트 처리

분류에서Dev

Kafka 소스 JDBC 커넥터-마이크로 배치

분류에서Dev

TensorFlow에서 대규모 학습 / 검증 / 테스트 데이터 세트 읽기

분류에서Dev

Kafka Connect AWS S3 싱크 커넥터가 주제에서 읽지 않음

분류에서Dev

Kafka Mongodb 싱크 커넥터-업데이트 문서

분류에서Dev

대규모 데이터 세트를 위해 MySQL .net 커넥터로 결과 세트를 스트리밍하는 방법

분류에서Dev

WorkerSinkTask 오류를 발생시키는 Kafka Connect JDBC 싱크 커넥터

분류에서Dev

Kafka Connect MQTT 커넥터가있는 ClassNotFoundException

분류에서Dev

Ruby / Rails에서 대규모 데이터 세트 가져 오기 작업

분류에서Dev

대규모 데이터 세트 관리

분류에서Dev

서버 (노드 및 리 액트)에서 리디렉션 후 초기 페이지로드시 세션 쿠키 데이터 가져 오기

분류에서Dev

초기 모델에로드하지 않고 Laravel 관계에서 데이터를 가져 오는 방법

분류에서Dev

루프백 인터페이스가 초기화되기 전에 모든 방화벽 규칙을로드하는 데 단점이나 위험이 있습니까?

분류에서Dev

독립형 및 분산 모드에서 단일 kafka s3 싱크 커넥터 실행

분류에서Dev

java.sql.SQLException의 : 커넥터 8.0.11의 서버로부터 수신 알 초기 문자 세트 인덱스 '255'

분류에서Dev

히스토리에서 초기 데이터로드 방지

분류에서Dev

Kafka 싱크 커넥터-> postgres, avro JSON 데이터로 실패

분류에서Dev

프로그램 메모리의 초기화되지 않은 데이터 세그먼트

분류에서Dev

USB 포트가있는 CF 카드 리더에 대한 플로피 드라이브 커넥터

분류에서Dev

Kafka Connect : 데이터를 데이터베이스 테이블에 싱크 할 때 avro 스키마의 모든 필드가 무시할 수 있습니까?

분류에서Dev

Ajax가 X 초 모두 다시로드-데이터 테이블

분류에서Dev

JPA의 기존 커넥터 테이블에서 데이터로드

분류에서Dev

Elixir / Phoenix-테스트 실행 초기에 코드 실행, 모든 테스트에 대한 공유 데이터

분류에서Dev

Spring 데이터로드 부모에서 모든 게으른 초기화 자식 테이블

분류에서Dev

대규모 데이터 세트 및 IOPS 제한에 대한 Mongo 디스크 읽기 병목 현상

분류에서Dev

대규모 데이터 세트를 처리 할 때 빠른 NOSQL 쿼리

분류에서Dev

대규모 데이터를 가져 오는 Xcode Firebase 쿼리

분류에서Dev

Debezium 및 Kafka Connect JDBC 싱크 커넥터를 사용하여 데이터베이스를 동기화 할 때 기본 키의 이름을 바꾸는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    Kafka Connect에서 JDBC 싱크 커넥터를 사용하여 레코드를 삭제할 수없는 이유

  2. 2

    데이터 기반 테스트로 대규모 데이터 세트 처리

  3. 3

    Kafka 소스 JDBC 커넥터-마이크로 배치

  4. 4

    TensorFlow에서 대규모 학습 / 검증 / 테스트 데이터 세트 읽기

  5. 5

    Kafka Connect AWS S3 싱크 커넥터가 주제에서 읽지 않음

  6. 6

    Kafka Mongodb 싱크 커넥터-업데이트 문서

  7. 7

    대규모 데이터 세트를 위해 MySQL .net 커넥터로 결과 세트를 스트리밍하는 방법

  8. 8

    WorkerSinkTask 오류를 발생시키는 Kafka Connect JDBC 싱크 커넥터

  9. 9

    Kafka Connect MQTT 커넥터가있는 ClassNotFoundException

  10. 10

    Ruby / Rails에서 대규모 데이터 세트 가져 오기 작업

  11. 11

    대규모 데이터 세트 관리

  12. 12

    서버 (노드 및 리 액트)에서 리디렉션 후 초기 페이지로드시 세션 쿠키 데이터 가져 오기

  13. 13

    초기 모델에로드하지 않고 Laravel 관계에서 데이터를 가져 오는 방법

  14. 14

    루프백 인터페이스가 초기화되기 전에 모든 방화벽 규칙을로드하는 데 단점이나 위험이 있습니까?

  15. 15

    독립형 및 분산 모드에서 단일 kafka s3 싱크 커넥터 실행

  16. 16

    java.sql.SQLException의 : 커넥터 8.0.11의 서버로부터 수신 알 초기 문자 세트 인덱스 '255'

  17. 17

    히스토리에서 초기 데이터로드 방지

  18. 18

    Kafka 싱크 커넥터-> postgres, avro JSON 데이터로 실패

  19. 19

    프로그램 메모리의 초기화되지 않은 데이터 세그먼트

  20. 20

    USB 포트가있는 CF 카드 리더에 대한 플로피 드라이브 커넥터

  21. 21

    Kafka Connect : 데이터를 데이터베이스 테이블에 싱크 할 때 avro 스키마의 모든 필드가 무시할 수 있습니까?

  22. 22

    Ajax가 X 초 모두 다시로드-데이터 테이블

  23. 23

    JPA의 기존 커넥터 테이블에서 데이터로드

  24. 24

    Elixir / Phoenix-테스트 실행 초기에 코드 실행, 모든 테스트에 대한 공유 데이터

  25. 25

    Spring 데이터로드 부모에서 모든 게으른 초기화 자식 테이블

  26. 26

    대규모 데이터 세트 및 IOPS 제한에 대한 Mongo 디스크 읽기 병목 현상

  27. 27

    대규모 데이터 세트를 처리 할 때 빠른 NOSQL 쿼리

  28. 28

    대규모 데이터를 가져 오는 Xcode Firebase 쿼리

  29. 29

    Debezium 및 Kafka Connect JDBC 싱크 커넥터를 사용하여 데이터베이스를 동기화 할 때 기본 키의 이름을 바꾸는 방법은 무엇입니까?

뜨겁다태그

보관