내 현재 테스트 구성은 다음과 같습니다.
version: '3.7'
services:
postgres:
image: debezium/postgres
restart: always
ports:
- "5432:5432"
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
restart: always
ports:
- "9092:9092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
connect:
image: debezium/connect
restart: always
ports:
- "8083:8083"
links:
- zookeeper
- postgres
- kafka
depends_on:
- zookeeper
- postgres
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
나는 다음과 같이 docker-compose로 실행합니다.
$ docker-compose up
그리고 오류 메시지가 표시되지 않습니다. 모든 것이 정상적으로 작동하는 것 같습니다. 그렇게 docker ps
하면 모든 서비스가 실행되고 있음을 알 수 있습니다.
Kafka가 실행 중인지 확인하기 위해 Kafka 생산자와 Kafka 소비자를 Python으로 만들었습니다.
# producer. I run it in one console window
from kafka import KafkaProducer
from json import dumps
from time import sleep
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('numtest', value=data)
sleep(5)
# consumer. I run it in other colsole window
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
그리고 그것은 절대적으로 훌륭하게 작동합니다. 생산자가 메시지를 게시하는 방법과 메시지가 소비자 창에서 어떻게 소비되는지 확인합니다.
이제 CDC를 작동시키고 싶습니다. 우선 Postgres 컨테이너 내부에서 postgres
역할 암호를 postgres
다음과 같이 설정 했습니다 .
$ su postgres
$ psql
psql> \password postgres
Enter new password: postgres
그런 다음 새 데이터베이스를 만들었습니다 test
.
psql> CREATE DATABASE test;
테이블을 만들었습니다.
psql> \c test;
test=# create table mytable (id serial, name varchar(128), primary key(id));
마지막으로 Debezium CDC 스택을 위해 커넥터를 만들었습니다.
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "postgres",
"database.whitelist": "public.mytable",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "public.some_topic"
}
}'
{"name":"test-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"test","database.server.name":"postgres","database.whitelist":"public.mytable","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"public.some_topic","name":"test-connector"},"tasks":[],"type":"source"}
보시다시피 내 커넥터는 오류없이 생성되었습니다. 이제 Debezium CDC가 Kafka 주제에 대한 모든 변경 사항을 게시 할 것으로 예상 public.some_topic
합니다. 이를 확인하기 위해 새 Kafka comsumer를 만듭니다.
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'public.some_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
첫 번째 예와의 유일한 차이점은 내가보고 있다는 것 public.some_topic
입니다. 그런 다음 데이터베이스 콘솔로 이동하여 삽입합니다.
test=# insert into mytable (name) values ('Tom Cat');
INSERT 0 1
test=#
따라서 새 값이 삽입되지만 소비자 창에서 아무 일도 일어나지 않습니다. 즉, Debezium은 Kafka에 이벤트를 게시하지 않습니다 public.some_topic
. 그게 무엇이 문제이며 어떻게 해결할 수 있습니까?
Docker Compose를 사용하면 커넥터가 생성 될 때 Kafka Connect 작업자 로그에 다음 오류가 표시됩니다.
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
... 9 more
Kafka Connect REST API를 사용하여 쿼리하는 경우 작업 상태에도 미러링됩니다.
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
"name": "test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.16.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.16.5:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
}
],
"type": "source"
실행중인 Postgres 버전은 다음과 같습니다.
postgres=# SHOW server_version;
server_version
----------------
9.6.16
(가) pgoutput
에서만 사용할 수 있습니다> = 버전 10.
Docker Compose를 버전 10을 사용하도록 변경했습니다.
image: debezium/postgres:10
깔끔한 시작을 위해 스택을 바운스하고 지침을 따르면 실행중인 커넥터를 얻습니다.
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | test-connector | RUNNING | RUNNING | io.debezium.connector.postgresql.PostgresConnector
및 Kafka 주제의 데이터 :
$ docker exec kafkacat kafkacat -b kafka:9092 -t postgres.public.mytable -C
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.mytable.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Tom Cat"},"source":{"version":"1.0.0.Final","connector":"postgresql","name":"postgres","ts_ms":1579172192292,"snapshot":"false","db":"test","schema":"public","table":"mytable","txId":561,"lsn":24485520,"xmin":null},"op":"c","ts_ms":1579172192347}}% Reached end of topic postgres.public.mytable [0] at offset 1
Docker Compose에 kafkacat을 추가했습니다.
kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
while [ 1 -eq 1 ];do sleep 60;done
편집 : 여전히 유용하고 관련성이 있으므로 이전 답변 유지 :
Debezium은 테이블 이름에 따라 주제에 메시지를 작성 합니다 . 귀하의 예에서 이것은 postgres.test.mytable
.
이것이 kafkacat
유용한 이유 입니다.
kafkacat -b broker:9092 -L
모든 주제 및 파티션 목록을 확인합니다. 주제가 있으면
kafkacat -b broker:9092 -t postgres.test.mytable -C
그것을 읽을 수 있습니다.
Docker 로 실행 하는 방법을 포함하여 kafkacat에 대한 세부 정보를 확인하십시오.
여기에 Docker Compose와 함께 작동 하는 데모도 있습니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다