상호 연결된 엔티티 스키마를 RDMS 및 그래프와 같은 데이터베이스에서 이벤트 형태로 사용할 수 있도록 Kafka 토픽을 구성하는 방법

Michail Michailidis

Information개체를 포함하는 Element개체 가있는 경우 있습니다. Information객체를 저장하면 Element고유 한 값 필드를 기반으로 기존 객체 를 찾으려고 하지 않으면 삽입합니다. 지금은 Information개체와 Element개체를 삭제할 수 없습니다. 부모를 추가하려면 두 개의 기존 Element개체가 필요 합니다. 나는 세 가지 주제를 사용할 계획했다 : CreateElement, CreateInformation, AddParentOfElement이벤트에 대한 Created Element Event, Created Information Event그리고 Added Parent Event. 나는 그림에 표시된 것과 같은 이벤트가 다른 순서로 소비 될 수 있다는 것을 주제와 주제 파티션 사이에 순서 보장이 없기 때문에 스키마가 예를 들어 RDBMS에 지속될 수 없다는 것을 깨달았습니다. 평소와 같이 주제의 파티션 할당에 ID가 사용된다고 가정합니다.

내 다이어그램은 다음과 같습니다.

여기에 이미지 설명 입력

시나리오는

  1. Element (id = 1)이 (가) 사용자에 의해 생성되었습니다.
  2. InformationElements사용자가 (1,2,3)을 포함하는 (id = 1) 포함
  3. Element (id = 5)가 사용자에 의해 생성되었습니다.
  4. Elementwith (id = 5)의 부모 Element는 사용자에 의해 (id = 3)으로 설정되었습니다.
  5. InformationElements(1,3 및 5)를 포함하는 (id = 2)가 사용자에 의해 생성되었습니다.

내 주제 선택이 의미가 있는지 궁금하고 소비자 데이터베이스 서비스에서 처리 할 때 멱 등성을 갖는 이벤트를 갖는 방법에 대한 제안을 감사하겠습니다. 시스템을 잘못된 상태에 두지 마십시오.

감사!

Michail Michailidis

이 솔루션을 고려한 후 : Spring Cloud Stream Kafka 및 서비스 별 데이터베이스를 사용하여 마이크로 서비스 이벤트 기반 아키텍처를 구현하는 방법은 있지만 제안에 만족하지 않습니다. Confluent Bottled Water ( https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/ )를 조사 했고 나중에 더 활동적이면서도 비슷한 Debezium ( http : // debezium.io/ )

나는 Debezium 방식을 따르기로 결정했습니다. Debezium은 Mysql / Postgres binlog에서 직접 읽고 이러한 변경 사항 (스키마 및 데이터)을 Kafka에 게시하는 플러그인입니다.

내가 사용하는 예제 설정에는 docker가 포함되며 여기에서 Docker Toolbox (Windows) 및 Docker (Linux)를 설정하는 방법이 있습니다.

1a) Linux (Docker)

sudo docker stop $(sudo docker ps -a -q) \
sudo docker rm -f $(sudo docker ps -a -q) \
sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \
sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \
sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \
sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \
sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \
sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest

1b) Windows (Docker Toolbox)

docker stop $(docker ps -a -q) ;
docker rm -f $(docker ps -a -q) ;
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ;
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ;
docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ;
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ;
docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ;
docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ;

2) connect the databse to the debezium connect

send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body
{
    "name": "inventory-connector",
    "config": {
        "name": "inventory-connector",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

Debezium은 각 테이블에 대해 하나씩 kafka 토픽을 생성합니다. 포트 8000의 landoop / kafka-topics-ui 서버로 이동하면 메시지 페이로드의 스키마가 아래와 같이 어떻게 보이는지 확인할 수 있습니다. 중요한 부분은 payload beforeafter해당 데이터베이스 행의 이전 값과 새 값을 보냅니다. 또한 op업데이트 등을 위해 'u'를 만들 때 'c'입니다.

여기에 이미지 설명 입력

각 소비 마이크로 서비스는 해당 maven 종속성을 사용하여 spring-cloud kafka 바인더를 사용합니다.

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Brixton.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>1.5.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
                <version>1.2.0.RELEASE</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies> 
    [...]
       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
       </dependency>
    [...]
    </dependencies>

그런 다음 소비하는 Spring Cloud Microservices 각각에 관심있는 모든 주제를 한 번에 수신하고 각 주제 이벤트를 전용 이벤트 핸들러에 위임하는 리스너가 있습니다.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Listener {

    public final CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(id = "listener", topics = {
            "dbserver1.inventory.entity",
            "dbserver1.inventory.attribute",
            "dbserver1.inventory.entity_types"
    } , group = "group1")
    public void listen(ConsumerRecord<?, ?> record) {
        String topic = record.topic();
        if (topic.equals("dbserver1.inventory.entity") {
            // delegate to appropriate handler
            // EntityEventHandler.handle(record);
        }
        else if (...) {}
    }
}

In my case I wanted to be updating a graph based on the changes that happen on the RDBMS side. Of course the graph database will be eventually consistent with RDBMS. My concern was that since the topics include changes e.g in join_tables as well as the joined table sides, I wouldn't be able to create the corresponding edges and vertices without knowing that each of the vertices of the edges exist. So I decided to ask debezium gitter (https://gitter.im/debezium/dev):

From the discussion below two ways exist..Either create edges and vertices using placeholders for topics that haven't been consumed yet or use Kafka Streams to seam topics back to their original structures something that seems more painful to me than the first way. So I decided to go with the first way :)

Michail Michailidis @zifnab87 Apr 17 11:23 Hi I was able to integrate Mysql with Debezium Connect and using landoop/topics-ui I am able to see that the topics are picked up properly and messages are sent the way they have to. I saw that for each of the tables there is a topic. e.g join tables are separate topics too.. If lets say I have three tables order, product and order_product and I have a service consuming all three topics.. I might get first the insertion on order_product and then the insertion of order.. That may cause a problem if I am trying to push this information to a graph database.. I will try to create an edge on vertex that is not there yet.. how can I make consumers that consume events lets say based on a transactionId or at least are aware of the boundary context.. is there an easy way to listen to those events and then deserialize them to a real java object so I can push that to a graph database or search index? If not how would you approach this problem? Thanks!

Randall Hauch @rhauch Apr 17 19:19 @zifnab87 Debezium CDC is purely a row-based approach, so by default all consumers see the row-level change events in an eventually consistent manner. Of course, the challenge to eventual consistency of downstream systems is that they might potentially leak data states that never really existed in the upstream source. But with that come lots of other really huge benefits: downstream consumers are much simpler, more resilient to failure, have lower latency (since there’s no need to wait for the appearance of the upstream transaction’s completion before processing), and are less decoupled to the upstream system. You gave the example of an order and product tables with an order_product intersect table. I agree that when thinking transactionally it does not make sense for an order_product relationship to be added before both the order and product instances exist. But are you required to live with that constraint? Can the order_product consumer create placeholder nodes in the graph database for any missing order and/or product values referenced by the relationship? In this case when the order_product consumer is a bit ahead of the order consumer, it might create an empty order node with the proper key or identifier, and when the order consumer finally processes the new order it would find the existing placeholder node and fill in the details. Of course, when the order arrives before the order_product relationships, then everything works as one might expect. This kind of approach might not be allowed by the downstream graph database system or the business-level constraints defined in the graph database. But if it is allowed and the downstream applications and services are designed to handle such states, then you’ll benefit from the significant simplicity that this approach affords, as the consumers become almost trivial. You’ll be managing less intermediate state and your system will be more likely to continue operating when things go wrong (e.g., consumers crash or are taken down for maintenance). If your downstream consumers do have to stick with ahering to the transaction boundaries in the source database, then you might consider using Kafka Streams to join the order and order_product topics and produce a single aggregate order object with all the relationships to the referenced products. If you can’t assume the product already exists, then you could also join with the product topic to add more product detail to the aggregate order object. Of course, there still are lots of challenges, since the only way for a stream processor consuming these streams to know it’s seen all of the row-level change events for a given transaction is when a subsequent transaction is seen on each of the streams. As you might expect, this is not ideal, since the last transaction prior to any quiet period will not complete immediately.

Michail Michailidis @zifnab87 Apr 17 23:49 Thanks @rhauch really well explained! I was investigating Kafka Streams while waiting for your answers! now I am thinking I will try to code the placeholder variation e.g when a vertex is not there etc

Randall Hauch @rhauch Apr 17 23:58 @zifnab87 glad it helped, at least a bit! Be sure you also consider the fact that the consumer might see a sequence of messages that it already has consumed. That will only happen when something goes wrong (e.g., with the connector or the process(es) where the connector is running, or the broker, network partition, etc.); when everything is operating normally, the consumer should see no duplicate messages.

Michail Michailidis @zifnab87 Apr 18 01:15 @rhauch Sure it helped! Yeap I have that in mind - consumer processes need to be idempotent. I am curious if for example sinks for lets say elastic search, mongodb and graph databases can be implemented to consolidate events produced from debezium-mysql no matter what the order by using placeholders for missing things.. e.g the mountaineer sinks are doing that alreadu if you know by any chance? I am trying to avoid reimplementing things that already exist.. Also my solutions might be very fragile if mysql schema changes and I dont consume the new events.. I feel so many things are missing around the microservices world

Randall Hauch @rhauch Apr 18 03:30 그 싱크가 어떻게 작동하는지 잘 모르겠습니다. 이상적으로는 이벤트 생성, 업데이트 및 삭제를 올바르게 처리해야합니다. 그러나 Debezium 이벤트는 모든 이벤트의 최상위 수준에 엔벨로프가 있으므로 SMT를 사용하여 after 필드의 내용을 가져와야 (또는 before 필드 제외) "의미있는"부분이 싱크 시스템에 배치됩니다. . 더 많은 SMT가 KC에 추가되면 더 쉬워 질 것입니다. 너무 많은 SMT가 필요하고 Debezium이이를 수행 한 SMT를 추가하려는 경우 JIRA에 기능 요청을 기록하십시오.

이 답변 / 가이드가 다른 사람들이 Kafka와 같은 메시지 브로커를 중심으로 이벤트 소싱을 시작하는 데 도움이되기를 바랍니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

Related 관련 기사

뜨겁다태그

보관