Postgres Debezium CDC는 Kafka에 변경 사항을 게시하지 않습니다.

야 코비안

내 현재 테스트 구성은 다음과 같습니다.

version: '3.7'
services:
  postgres:
    image: debezium/postgres
    restart: always
    ports:
      - "5432:5432"
  zookeeper:
    image: debezium/zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
  kafka:
    image: debezium/kafka
    restart: always
    ports:
      - "9092:9092"
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
  connect:
    image: debezium/connect
    restart: always
    ports:
      - "8083:8083"
    links:
      - zookeeper
      - postgres
      - kafka
    depends_on:
      - zookeeper
      - postgres
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

나는 다음과 같이 docker-compose로 실행합니다.

$ docker-compose up

그리고 오류 메시지가 표시되지 않습니다. 모든 것이 정상적으로 작동하는 것 같습니다. 그렇게 docker ps하면 모든 서비스가 실행되고 있음을 알 수 있습니다.

Kafka가 실행 중인지 확인하기 위해 Kafka 생산자와 Kafka 소비자를 Python으로 만들었습니다.

# producer. I run it in one console window
from kafka import KafkaProducer
from json import dumps
from time import sleep

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

for e in range(1000):
    data = {'number' : e}
    producer.send('numtest', value=data)
    sleep(5)

# consumer. I run it in other colsole window

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
     print(message)

그리고 그것은 절대적으로 훌륭하게 작동합니다. 생산자가 메시지를 게시하는 방법과 메시지가 소비자 창에서 어떻게 소비되는지 확인합니다.

이제 CDC를 작동시키고 싶습니다. 우선 Postgres 컨테이너 내부에서 postgres역할 암호를 postgres다음과 같이 설정 했습니다 .

$ su postgres
$ psql
psql> \password postgres
Enter new password: postgres

그런 다음 새 데이터베이스를 만들었습니다 test.

psql> CREATE DATABASE test;

테이블을 만들었습니다.

psql> \c test;
test=# create table mytable (id serial, name varchar(128), primary key(id));

마지막으로 Debezium CDC 스택을 위해 커넥터를 만들었습니다.

$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "test-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "pgoutput",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "test",
    "database.server.name": "postgres",
    "database.whitelist": "public.mytable",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "public.some_topic"
    }
}'

{"name":"test-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"test","database.server.name":"postgres","database.whitelist":"public.mytable","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"public.some_topic","name":"test-connector"},"tasks":[],"type":"source"}

보시다시피 내 커넥터는 오류없이 생성되었습니다. 이제 Debezium CDC가 Kafka 주제에 대한 모든 변경 사항을 게시 할 것으로 예상 public.some_topic합니다. 이를 확인하기 위해 새 Kafka comsumer를 만듭니다.

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'public.some_topic',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
     print(message)

첫 번째 예와의 유일한 차이점은 내가보고 있다는 것 public.some_topic입니다. 그런 다음 데이터베이스 콘솔로 이동하여 삽입합니다.

test=# insert into mytable (name) values ('Tom Cat');    
INSERT 0 1
test=#

따라서 새 값이 삽입되지만 소비자 창에서 아무 일도 일어나지 않습니다. 즉, Debezium은 Kafka에 이벤트를 게시하지 않습니다 public.some_topic. 그게 무엇이 문제이며 어떻게 해결할 수 있습니까?

로빈 모팻

Docker Compose를 사용하면 커넥터가 생성 될 때 Kafka Connect 작업자 로그에 다음 오류가 표시됩니다.

Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
        at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
        at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
        ... 9 more

Kafka Connect REST API를 사용하여 쿼리하는 경우 작업 상태에도 미러링됩니다.

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
  "name": "test-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.16.5:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.16.5:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
    }
  ],
  "type": "source"

실행중인 Postgres 버전은 다음과 같습니다.

postgres=# SHOW server_version;
 server_version
----------------
 9.6.16

(가) pgoutput에서만 사용할 수 있습니다> = 버전 10.

Docker Compose를 버전 10을 사용하도록 변경했습니다.

image: debezium/postgres:10

깔끔한 시작을 위해 스택을 바운스하고 지침을 따르면 실행중인 커넥터를 얻습니다.

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
source  |  test-connector  |  RUNNING  |  RUNNING  |  io.debezium.connector.postgresql.PostgresConnector

및 Kafka 주제의 데이터 :

$ docker exec kafkacat kafkacat -b kafka:9092 -t postgres.public.mytable -C
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.mytable.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Tom Cat"},"source":{"version":"1.0.0.Final","connector":"postgresql","name":"postgres","ts_ms":1579172192292,"snapshot":"false","db":"test","schema":"public","table":"mytable","txId":561,"lsn":24485520,"xmin":null},"op":"c","ts_ms":1579172192347}}% Reached end of topic postgres.public.mytable [0] at offset 1

Docker Compose에 kafkacat을 추가했습니다.

  kafkacat:
    image: edenhill/kafkacat:1.5.0
    container_name: kafkacat
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        while [ 1 -eq 1 ];do sleep 60;done

편집 : 여전히 유용하고 관련성이 있으므로 이전 답변 유지 :

Debezium은 테이블 이름에 따라 주제에 메시지를 작성 합니다 . 귀하의 예에서 이것은 postgres.test.mytable.

이것이 kafkacat유용한 이유 입니다.

kafkacat -b broker:9092 -L 

모든 주제 및 파티션 목록을 확인합니다. 주제가 있으면

kafkacat -b broker:9092 -t postgres.test.mytable -C

그것을 읽을 수 있습니다.

Docker실행 하는 방법을 포함하여 kafkacat에 대한 세부 정보를 확인하십시오.

여기에 Docker Compose와 함께 작동 하는 데모도 있습니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Debezium postgres 오류 : "include-unchanged-toast"매개 변수는 더 이상 사용되지 않습니다.

분류에서Dev

Reactive Redis는 Flux에 변경 사항을 지속적으로 게시하지 않습니다.

분류에서Dev

Python Django는 makemigrations에서 변경 사항을 선택하지 않습니다.

분류에서Dev

ts-node-dev는 자동 다시로드에 변경 사항을 적용하지 않습니다.

분류에서Dev

devtools는 적은 파일 수정에 대한 변경 사항을 표시하지 않습니다.

분류에서Dev

Group By는 변경 사항을 고려하지 않습니다.

분류에서Dev

Sencha Architect는 변경 사항을 반영하지 않습니다.

분류에서Dev

fsck는 변경 사항을 기록하지 않습니다.

분류에서Dev

Postgres Debezium은 레코드의 이전 상태를 게시하지 않습니다.

분류에서Dev

변경 사항에 대한 jquery가 게시 후 작동하지 않습니다.

분류에서Dev

Skaffold Kubernetes는 React 변경 사항을 표시하지 않습니다.

분류에서Dev

Flask __init__.py는 변경 사항을 표시하지 않습니다.

분류에서Dev

React.js는 변경 사항을 알리지 않습니다.

분류에서Dev

변경 사항을 GitHub에 푸시하지 못했습니다.

분류에서Dev

Magical Record는 기본 컨텍스트에서 변경 사항을 감지하지 않습니다.

분류에서Dev

효소는 비동기 setState 콜백 내에서 setState 변경 사항을 반영하지 않습니다.

분류에서Dev

Systemd 서비스는 변경 사항을 디스크에 저장하지 않습니다.

분류에서Dev

listView는 notifyDataSetChanged 후 포커스가 변경 될 때까지 변경 사항을 표시하지 않습니다.

분류에서Dev

도커에서 실행되는 debezium 및 confluent-sink-connector를 사용하여 소스에서 대상 DB로 모든 변경 사항을 복제하는 방법

분류에서Dev

변경 사항이없는 경우 WordPress에서 "사이트를 나가시겠습니까? 변경 사항이 저장되지 않을 수 있습니다"라는 경고를 시작했습니다.

분류에서Dev

App.config는 앱을 다시 시작한 후 일부 변경 사항을 저장하지 않습니다.

분류에서Dev

Git은 아직 푸시되지 않은 커밋 (마지막 커밋 아님)에 파일 변경 사항을 추가하는 것을 잊었습니다.

분류에서Dev

Nest.Js는 변경 사항을 수락하지 않습니다.

분류에서Dev

Rancher는 카탈로그의 변경 사항을 선택하지 않습니다.

분류에서Dev

DataTable RejectChanges는 모든 변경 사항을 재설정하지 않습니다.

분류에서Dev

ExecuteNonQuery 업데이트는 변경 사항을 저장하지 않습니다.

분류에서Dev

핵심 데이터는 변경 사항을 저장하지 않습니다

분류에서Dev

Wordpress는 변경 사항을 업데이트하지 않습니다.

분류에서Dev

udev는 변경 사항을 재귀 적으로 적용하지 않습니다.

Related 관련 기사

  1. 1

    Debezium postgres 오류 : "include-unchanged-toast"매개 변수는 더 이상 사용되지 않습니다.

  2. 2

    Reactive Redis는 Flux에 변경 사항을 지속적으로 게시하지 않습니다.

  3. 3

    Python Django는 makemigrations에서 변경 사항을 선택하지 않습니다.

  4. 4

    ts-node-dev는 자동 다시로드에 변경 사항을 적용하지 않습니다.

  5. 5

    devtools는 적은 파일 수정에 대한 변경 사항을 표시하지 않습니다.

  6. 6

    Group By는 변경 사항을 고려하지 않습니다.

  7. 7

    Sencha Architect는 변경 사항을 반영하지 않습니다.

  8. 8

    fsck는 변경 사항을 기록하지 않습니다.

  9. 9

    Postgres Debezium은 레코드의 이전 상태를 게시하지 않습니다.

  10. 10

    변경 사항에 대한 jquery가 게시 후 작동하지 않습니다.

  11. 11

    Skaffold Kubernetes는 React 변경 사항을 표시하지 않습니다.

  12. 12

    Flask __init__.py는 변경 사항을 표시하지 않습니다.

  13. 13

    React.js는 변경 사항을 알리지 않습니다.

  14. 14

    변경 사항을 GitHub에 푸시하지 못했습니다.

  15. 15

    Magical Record는 기본 컨텍스트에서 변경 사항을 감지하지 않습니다.

  16. 16

    효소는 비동기 setState 콜백 내에서 setState 변경 사항을 반영하지 않습니다.

  17. 17

    Systemd 서비스는 변경 사항을 디스크에 저장하지 않습니다.

  18. 18

    listView는 notifyDataSetChanged 후 포커스가 변경 될 때까지 변경 사항을 표시하지 않습니다.

  19. 19

    도커에서 실행되는 debezium 및 confluent-sink-connector를 사용하여 소스에서 대상 DB로 모든 변경 사항을 복제하는 방법

  20. 20

    변경 사항이없는 경우 WordPress에서 "사이트를 나가시겠습니까? 변경 사항이 저장되지 않을 수 있습니다"라는 경고를 시작했습니다.

  21. 21

    App.config는 앱을 다시 시작한 후 일부 변경 사항을 저장하지 않습니다.

  22. 22

    Git은 아직 푸시되지 않은 커밋 (마지막 커밋 아님)에 파일 변경 사항을 추가하는 것을 잊었습니다.

  23. 23

    Nest.Js는 변경 사항을 수락하지 않습니다.

  24. 24

    Rancher는 카탈로그의 변경 사항을 선택하지 않습니다.

  25. 25

    DataTable RejectChanges는 모든 변경 사항을 재설정하지 않습니다.

  26. 26

    ExecuteNonQuery 업데이트는 변경 사항을 저장하지 않습니다.

  27. 27

    핵심 데이터는 변경 사항을 저장하지 않습니다

  28. 28

    Wordpress는 변경 사항을 업데이트하지 않습니다.

  29. 29

    udev는 변경 사항을 재귀 적으로 적용하지 않습니다.

뜨겁다태그

보관