Source & Sink Connect 문서 추가

This commit is contained in:
roy-zz
2022-04-27 22:07:34 +09:00
parent cb9d3affc3
commit 432149b6d2
8 changed files with 114 additions and 0 deletions

View File

@@ -12,6 +12,8 @@
이전에 Producer와 Consumer를 사용하여 Kafka Cluster를 통해서 데이터를 제공하고 소비하는 작업을 진행했었다.
이번에는 Connect Source와 Connect Sink를 사용하여 데이터를 제공하고 소비하는 작업을 진행해본다.
---
### Source Connect
우리는 Postman을 사용하여 `connect`의 connectors API를 호출하여 새로운 정보를 등록할 것이다.
@@ -115,10 +117,122 @@ Consumer가 정상적으로 데이터를 수신하고 있다.
---
### Sink Connect
이번에는 `Source Connect`를 추가한 것과 유사하게 `Sink Connect`를 추가해본다.
`Sink Connect`의 역할을 알아보면 어떠한 `Source Connect`가 데이터를 Kafka의 토픽에 쌓으면 쌓인 데이터를 가져와서 처리하는 역할을 한다.
우리는 `connect`의 connectors API를 호출할 때 HTTP Body에 아래와 같은 정보를 전달할 것이다.
```json
{
"name": "roy-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "roy_topic_users"
}
}
```
JSON 데이터에 MariaDB의 접속 정보와 접속 클라이언트를 지정하면서 `Sink Connect`가 DB와 연동되는 것을 알리는 옵션이다.
`topics``Sink Connect`가 어떠한 토픽에 데이터가 들어오기를 대기하는지를 지정하는 옵션이며 `auto.create`라는 옵션에 의해서 토픽과 이름이 동일한 테이블이 없으면 토픽과 이름이 동일한 테이블을 생성하여 데이터를 저장한다.
쉽게 정리하면 `Source Connect`는 자신이 관찰하는 데이터 소스에 새로운 데이터가 추가되면 토픽으로 데이터를 전달하고 `Sink Connect`는 자신이 관찰하는 토픽에 새로운 데이터가 추가되면 데이터를 전달해야하는 곳에 데이터를 전달한다.
`Source Connect`를 사용하였을 때 DB에 새로운 데이터가 들어가면 새로운 데이터 뿐만 아니라 아래와 같이 DB 스키마 정보까지 같이 토픽으로 전달되었다.
```json
{
"schema": {
"type": "struct",
"fields": [
{ "type": "int32", "optional": false, "field": "id" },
{ "type": "string", "optional": true, "field": "user_id" },
{ "type": "string", "optional": true, "field": "password" },
{ "type": "string", "optional": true, "field": "name" },
{ "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "created_at"}
],
"optional": false,
"name": "users" },
"payload": {
"id": 7,
"user_id": "roy",
"password": "password",
"name": "roy choi",
"created_at":1651022587000
}
}
```
1. 토픽 확인
아래의 커맨드를 입력하여 현재 존재하는 토픽을 확인한다. 정상작동을 단순히 확인 과정이므로 큰 의미를 두는 단계는 아니다.
```bash
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
```
![](sink_source_image/topic-list.png)
2. 테이블 데이터 확인
아래의 SQL을 실행하여 users 테이블에 들어있는 데이터를 확인한다. 정상작동을 위한 단순 확인 과정이므로 큰 의미를 두는 단계는 아니다.
```sql
SELECT * FROM users;
```
![](sink_source_image/select-users-table.png)
3. `Sink Connect` 등록
POST /connectors API를 호출하여 `Sink Connect`를 등록한다.
![](sink_source_image/register-sync-connect.png)
4. `Connect` 정상 등록 확인
GET /connectors API를 호출하여 정상적으로 `Sink Connect`가 등록되었는지 확인한다.
![](sink_source_image/sink-connect-register-check.png)
5. 테이블 확인
정상적으로 `Sink Connect`가 등록되었다는 것은 DB에 토픽이름과 동일한 테이블이 생성되었다는 것을 의미한다.
아래의 SQL을 사용하여 정상적으로 테이블이 생성되었는지 확인한다.
```sql
SHOW TABLES;
```
![](sink_source_image/register-new-table.png)
6. 데이터 정상 전송 확인.
우리는 아래와 같은 구조로 데이터가 전달되도록 전체적인 구조를 잡았다.
`users Table` -> `Source Connect` -> `Kafka Topic` -> `Sink Connect` -> `roy_topic_users Table`
users 테이블에 아래의 쿼리를 사용하여 `Perry`라는 사용자를 Insert하고 `roy_topic_users` 테이블에 정상적으로 데이터가 들어가는지 확인해본다.
```sql
INSERT INTO users (user_id, password, name) VALUES ('perry', 'password', 'perry god');
```
데이터를 Insert하는 것과 동시에 `Consumer`를 실행 중인 터미널에 데이터가 출력되는 것을 확인할 수 있다.
![](sink_source_image/perry-data-to-consumer.png)
`roy_topic_users`에 데이터가 정상적으로 Insert되었는지 확인해본다.
![](sink_source_image/insert-perry-by-sink-connect.png)
정상적으로 데이터가 Insert된 것을 확인할 수 있다.
물론 `Source Connect`가 토픽에 데이터를 집어넣은 것과 동일한 형식으로 직접 토픽에 데이터를 집어넣어도 `Sink Connect`는 동일하게 반응한다.
---

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 125 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB