diff --git a/document/kafka/sink_source.md b/document/kafka/sink_source.md index b8b410a..b5c754b 100644 --- a/document/kafka/sink_source.md +++ b/document/kafka/sink_source.md @@ -12,6 +12,8 @@ 이전에 Producer와 Consumer를 사용하여 Kafka Cluster를 통해서 데이터를 제공하고 소비하는 작업을 진행했었다. 이번에는 Connect Source와 Connect Sink를 사용하여 데이터를 제공하고 소비하는 작업을 진행해본다. +--- + ### Source Connect 우리는 Postman을 사용하여 `connect`의 connectors API를 호출하여 새로운 정보를 등록할 것이다. @@ -115,10 +117,122 @@ Consumer가 정상적으로 데이터를 수신하고 있다. --- +### Sink Connect +이번에는 `Source Connect`를 추가한 것과 유사하게 `Sink Connect`를 추가해본다. +`Sink Connect`의 역할을 알아보면 어떠한 `Source Connect`가 데이터를 Kafka의 토픽에 쌓으면 쌓인 데이터를 가져와서 처리하는 역할을 한다. +우리는 `connect`의 connectors API를 호출할 때 HTTP Body에 아래와 같은 정보를 전달할 것이다. +```json +{ + "name": "roy-sink-connect", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", + "connection.url": "jdbc:mysql://localhost:3306/mydb", + "connection.user": "root", + "connection.password": "root", + "auto.create": "true", + "auto.evolve": "true", + "delete.enabled": "false", + "tasks.max": "1", + "topics": "roy_topic_users" + } +} +``` +JSON 데이터에 MariaDB의 접속 정보와 접속 클라이언트를 지정하면서 `Sink Connect`가 DB와 연동되는 것을 알리는 옵션이다. +`topics`는 `Sink Connect`가 어떠한 토픽에 데이터가 들어오기를 대기하는지를 지정하는 옵션이며 `auto.create`라는 옵션에 의해서 토픽과 이름이 동일한 테이블이 없으면 토픽과 이름이 동일한 테이블을 생성하여 데이터를 저장한다. +쉽게 정리하면 `Source Connect`는 자신이 관찰하는 데이터 소스에 새로운 데이터가 추가되면 토픽으로 데이터를 전달하고 `Sink Connect`는 자신이 관찰하는 토픽에 새로운 데이터가 추가되면 데이터를 전달해야하는 곳에 데이터를 전달한다. + +`Source Connect`를 사용하였을 때 DB에 새로운 데이터가 들어가면 새로운 데이터 뿐만 아니라 아래와 같이 DB 스키마 정보까지 같이 토픽으로 전달되었다. + +```json +{ + "schema": { + "type": "struct", + "fields": [ + { "type": "int32", "optional": false, "field": "id" }, + { "type": "string", "optional": true, "field": "user_id" }, + { "type": "string", "optional": true, "field": "password" }, + { "type": "string", "optional": true, "field": "name" }, + { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "created_at"} + ], + "optional": false, + "name": "users" }, + "payload": { + "id": 7, + "user_id": "roy", + "password": "password", + "name": "roy choi", + "created_at":1651022587000 + } +} +``` + +1. 토픽 확인 + +아래의 커맨드를 입력하여 현재 존재하는 토픽을 확인한다. 정상작동을 단순히 확인 과정이므로 큰 의미를 두는 단계는 아니다. + +```bash +$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list +``` + +![](sink_source_image/topic-list.png) + +2. 테이블 데이터 확인 + +아래의 SQL을 실행하여 users 테이블에 들어있는 데이터를 확인한다. 정상작동을 위한 단순 확인 과정이므로 큰 의미를 두는 단계는 아니다. + +```sql +SELECT * FROM users; +``` + +![](sink_source_image/select-users-table.png) + +3. `Sink Connect` 등록 + +POST /connectors API를 호출하여 `Sink Connect`를 등록한다. + +![](sink_source_image/register-sync-connect.png) + +4. `Connect` 정상 등록 확인 + +GET /connectors API를 호출하여 정상적으로 `Sink Connect`가 등록되었는지 확인한다. + +![](sink_source_image/sink-connect-register-check.png) + +5. 테이블 확인 + +정상적으로 `Sink Connect`가 등록되었다는 것은 DB에 토픽이름과 동일한 테이블이 생성되었다는 것을 의미한다. +아래의 SQL을 사용하여 정상적으로 테이블이 생성되었는지 확인한다. + +```sql +SHOW TABLES; +``` +![](sink_source_image/register-new-table.png) + +6. 데이터 정상 전송 확인. + +우리는 아래와 같은 구조로 데이터가 전달되도록 전체적인 구조를 잡았다. +`users Table` -> `Source Connect` -> `Kafka Topic` -> `Sink Connect` -> `roy_topic_users Table` +users 테이블에 아래의 쿼리를 사용하여 `Perry`라는 사용자를 Insert하고 `roy_topic_users` 테이블에 정상적으로 데이터가 들어가는지 확인해본다. + +```sql +INSERT INTO users (user_id, password, name) VALUES ('perry', 'password', 'perry god'); +``` + +데이터를 Insert하는 것과 동시에 `Consumer`를 실행 중인 터미널에 데이터가 출력되는 것을 확인할 수 있다. + +![](sink_source_image/perry-data-to-consumer.png) + +`roy_topic_users`에 데이터가 정상적으로 Insert되었는지 확인해본다. + +![](sink_source_image/insert-perry-by-sink-connect.png) + +정상적으로 데이터가 Insert된 것을 확인할 수 있다. + +물론 `Source Connect`가 토픽에 데이터를 집어넣은 것과 동일한 형식으로 직접 토픽에 데이터를 집어넣어도 `Sink Connect`는 동일하게 반응한다. --- diff --git a/document/kafka/sink_source_image/insert-perry-by-sink-connect.png b/document/kafka/sink_source_image/insert-perry-by-sink-connect.png new file mode 100644 index 0000000..9b74696 Binary files /dev/null and b/document/kafka/sink_source_image/insert-perry-by-sink-connect.png differ diff --git a/document/kafka/sink_source_image/perry-data-to-consumer.png b/document/kafka/sink_source_image/perry-data-to-consumer.png new file mode 100644 index 0000000..b340c90 Binary files /dev/null and b/document/kafka/sink_source_image/perry-data-to-consumer.png differ diff --git a/document/kafka/sink_source_image/register-new-table.png b/document/kafka/sink_source_image/register-new-table.png new file mode 100644 index 0000000..43583c4 Binary files /dev/null and b/document/kafka/sink_source_image/register-new-table.png differ diff --git a/document/kafka/sink_source_image/register-sync-connect.png b/document/kafka/sink_source_image/register-sync-connect.png new file mode 100644 index 0000000..4eb0efd Binary files /dev/null and b/document/kafka/sink_source_image/register-sync-connect.png differ diff --git a/document/kafka/sink_source_image/select-users-table.png b/document/kafka/sink_source_image/select-users-table.png new file mode 100644 index 0000000..a75c66c Binary files /dev/null and b/document/kafka/sink_source_image/select-users-table.png differ diff --git a/document/kafka/sink_source_image/sink-connect-register-check.png b/document/kafka/sink_source_image/sink-connect-register-check.png new file mode 100644 index 0000000..d82c494 Binary files /dev/null and b/document/kafka/sink_source_image/sink-connect-register-check.png differ diff --git a/document/kafka/sink_source_image/topic-list.png b/document/kafka/sink_source_image/topic-list.png new file mode 100644 index 0000000..1edf7ff Binary files /dev/null and b/document/kafka/sink_source_image/topic-list.png differ