목차 LIST
Debezium이란?
Change Data Capture(CDC)를 구현하는 오픈소스 플랫폼입니다.
CDC란 데이터베이스에서 발생하는 변경 사항(INSERT, UPDATE, DELETE)을 실시간으로 캡쳐하여 이벤트 형태로 다른 시스템으로 전송하는 기술입니다. Debezium은 이러한 변경 이벤트를 Apache Kafka와 같은 스트리밍 시스템으로 전달하여 다른 저장소로 실시간 데이터베이스 변경 사항을 처리할 수 있도록 합니다.
Debezium 사용 방법
아래는 Debezium을 사용하는 세 가지 방법에 대해 소개합니다.
1. Debezium Kafka Connect
대부분의 경우, Debezium은 Apache Kafka Connect를 통해 배포됩니다.
Kafka Connect는 아래 사항을 구현하고 운영하기 위한 프레임워크입니다.
1) Debezium과 같은 레코드를 Kafka로 보내는 소스 커넥터
2) Kafka 토픽에서 다른 시스템으로 레코드를 전파하는 싱크 커넥터
다음 이미지는 Debezium을 기반으로 한 변경 데이터 캡쳐 파이프라인 아키텍처입니다.
MySQL 및 PostgreSQL용 Debezium 커넥터는 각각의 데이터베이스에서 변경 사항을 캡쳐합니다.
각 Debezium 커넥터는 자신의 소스 데이터베이스에 연결을 설정합니다.
* MySQL 커텍터는 binlog에 접근하기 위하 클라이언트 라이브러리 사용
* PostgreSQL 커넥터는 logical replication stream에서 읽어옴
기본적으로, 하나의 데이터베이스 테이블에서 발생한 변경 사항은 하나의 kafka 토픽에 기록됩니다. (default:테이블명과 동일)
필요에 따라, Debezium의 토픽 라우팅 변환을 구성하여 대상 토픽 이름을 변경할 수 있습니다.
구성을 통해 아래와 같은 작업이 가능합니다.
* 테이블 이름과 다른 이름을 가진 토픽으로 레코드 라우팅
* 여러 테이블의 변경 이벤트 레코드를 하나의 토픽으로 스트리밍
Apache Kafka 토픽에 변경 이벤트 레코드가 저장되면, Kafka connet를 사용하여 해당 레코드를 Elasticsearch, 데이터 웨어하우스 및 분석시스템 과 같은 여러 데이터베이스로 스트리밍 할 수 있습니다.
선택한 싱크 커넥터에 따라, 모든 변경정보가 필요하지 않을 때는 변경 후의 정보만을 추출하여 다른 시스템으로 보낼 수 있습니다.
2. Debezium 서버
Debezium을 배포하는 또 다른 방법은 Debezium 서버를 사용하는 것입니다. Debezium 서버는 소스 데이터베이스에서 다양한 메시징 인프라로 변경 이벤트를 스트리밍하는 애플리케이션입니다.
다음 이미지는 Debezium 서버를 사용하는 변경 데이터 캡쳐 파이프라인 아키텍처입니다.
Debezium 서버는 소스 데이터베이스에서 변경 사항을 캡쳐하기 위해 Debezium 소스 커넥터 중 하나를 사용하도록 구성됩니다. 변경 이벤트는 JSON, Apache Avro와 같은 다양한 형식으로 직렬화될 수 있으며, 그 다음 다양한 메시징 인프라 중 하나로 전송됩니다.
3. Debezium Engine
Debezium 커넥터를 사용하는 또 다른 방법은 Debezium 엔진을 사용하는 것입니다. 이 경우, Debezium은 Kafka Connect를 통해 실행되지 않고, 사용자 정의 Java 애플리케이션에 임베디드된 라이브러리로 실행됩니다.
Apache Flink를 사용하여 스트리밍 하는 경우, 마지막 경우인 Debezium Engine을 사용한다고 볼 수 있습니다.
Debezium MySQL 연동
MySQL에는 데이터베이스에 커밋된 순서대로 모든 작업을 기록하는 바이너리 로그(binlog)가 있습니다. 여기에는 테이블 스키마 변경 사항뿐만 아니라 테이블의 데이터 변경 사항도 포함됩니다. MySQL은 이 바이너리 로그를 복제와 복구에 사용합니다.
Debezium MySQL 커넥터는 binlog를 읽고, 행 단위의 INSERT, UPDATE, DELETE 작업에 대한 변경 이벤트를 생성하고 Kafka 토픽으로 전송합니다.
MySQL은 일반적으로 일정 기간이 지나면 binlog를 자동으로 삭제하도록 설정되어 있기 때문에, MySQL 커넥터는 각 데이터베이스의 초기 스냅샷을 수행해야 합니다. 그리고 MySQL 커넥터는 스냅샷이 생성된 시점부터 binlog를 읽습니다.
Schema History Topic
스키마 히스토리 관리 : Debezium MySQL 커넥터는 데이터베이스의 변경 사항(INSERT, UPDATE, DELETE)를 Kafka로 전송하기 전에, 각 변경이 발생한 당시의 테이블 스키마를 추적해야 합니다. 이를 위해, 커넥터는 데이터베이스의 스키마 변경 사항(DDL 문)을 읽고, 해당 정보를 메모리에 저장합니다.
MySQL의 binlog에는 데이터 변경 사항 뿐 아니라 스키마 변경에 대한 DDL 문도 포함됩니다. 커넥터는 해당 DDL 문을 파싱하여 스키마 변경 내용을 기록하고, 이를 Kafka schema history topic에 저장합니다. 이 토픽은 커넥터 내부에서만 사용됩니다.
재시작 시 복구 : 커넥터가 다시 시작될 때, 이전에 기록된 스키마 히스토리 토픽을 읽고, 재시작 시점의 스키마를 복원합니다. 이를 통해 데이터 변경 사항이 기록된 당시의 스키마를 정확히 반영할 수 있습니다.
따라서, 초기 스냅샷 단계에서 현재 테이블 스키마를 캡쳐하여 메모리에 저장합니다. 그리고 커넥터는 각 테이블의 스키마 정보를 스키마 히스토리 토픽에 기록합니다. 이후 binlog에서 CDC 이벤트를 읽을 때 이 스키마 히스토리 토픽을 참고하여 이벤트가 발생한 당시의 스키마를 정확히 파악합니다.
Schema Change Topic
Debezium MySQL 커텍터를 구성하여 데이터베이스의 테이블에 적용된 스키마 변경 이벤트를 생성할 수 있습니다. 커넥터는 스키마 변경 이벤트를 <topicPrefix> 라는 이름의 kafka 토픽에 기록합니다. 여기서 topicPrefix는 topic.prefix 커넥터 구성 속성에 지정된 네임스페이스입니다. 커넥터가 스키마 변경 토픽에 보내는 메시지에는 페이로드가 포함되어 있으며, 선택적으로 변경 이벤트 메시지의 싘마도 포함될 수 있습니다.
예) 스키마 변경 이벤트를 Kafka 토픽에 기록할 때 사용하는 데이터 구조
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeKey",
"version": 1
},
"payload": {
"databaseName": "inventory"
}
}
정리하면, 스키마 변경 이벤트는 ADD COLUMN, ALTER TABLE 등을 기록하는 이벤트로 변경 사항만을 포함하고
스키마 히스토리는 SHOW CREATE TABLE 명령을 실행한 결과처럼, 테이블의 전체 구조를 포함합니다.
(IMPORTANT)
CDC에 연동된 테이블에서 커넥터는 스키마 변경의 기록을 스키마 변경 토픽에 저장할 뿐만 아니라, 내부 데이터베이스 스키마 히스토리 토픽에도 저장합니다. 내부 데이터베이스 스키마 히스토리 토픽은 커넥터 내부 용도로만 사용되며, 이를 소비하는 애플리케이션에서 직접 사용하는 것은 의도되지 않았으므로 스키마 변경에 대한 알림이 필요한 애플리케이션은 반드시 스키마 변경 토픽에서만 해당 정보를 사용해야 합니다.
(IMPORTANT)
데이터베이스 스키마 히스토리 토픽을 절대로 파티셔닝하면 안됩니다. 파티션 수를 1로 지정해주세요. 커넥터가 해당 토픽으로 내보내는 이벤트 기록들이 일관된 순서를 유지해야 스키마 히스토리 토픽이 올바르게 작동합니다.
참고로 카프카 토픽은 파티셔닝 하게 되면 데이터 순서 보장이 되지 않습니다.
예) MySQL 커넥터가 스키마 변경 토픽으로 보내는 메시지 샘플
참고로, 스키마 변경 토픽이지만 일관성과 신뢰성 보장을 위해 해당 시점 테이블의 모든 컬럼 정보가 포함되어 전송됩니다.
{
"schema": { },
"payload": {
"source": {
"version": "2.7.1.Final",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1651535750218,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"file": "mysql-bin.000003",
"pos": 570
},
"databaseName": "inventory",
"ddl": "ALTER TABLE customers ADD middle_name varchar(255) AFTER first_name",
"tableChanges": [
{
"type": "ALTER",
"id": "\"inventory\".\"customers\"",
"table": {
"defaultCharsetName": "utf8mb4",
"primaryKeyColumnNames": [
"id"
],
"columns": [
{
"name": "id",
"jdbcType": 4,
"typeName": "INT",
"position": 1,
"optional": false,
"autoIncremented": true
},
{
"name": "first_name",
"jdbcType": 12,
"typeName": "VARCHAR",
"position": 2,
"optional": false
},
{
"name": "middle_name",
"jdbcType": 12,
"typeName": "VARCHAR",
"position": 3,
"optional": true
},
{
"name": "last_name",
"jdbcType": 12,
"typeName": "VARCHAR",
"position": 4,
"optional": false
},
{
"name": "email",
"jdbcType": 12,
"typeName": "VARCHAR",
"position": 5,
"optional": false
}
]
}
}
]
}
}
Snapshot
Debezium MySQL 커넥터가 처음 시작되면 테이블의 초기 스냅샷을 수행합니다. 이 스냅샷은 현재 상태에 대한 기준이 됩니다. Debezium은 스냅샷을 실행할 때 여러 가지 모드를 사용할 수 있습니다. 스냅샷 모드는 snapshot.mode 구성 속성에 의해 결정됩니다. 이 속성의 기본값은 initial입니다. 커넥터가 스냅샷을 생성하는 방식을 변경하려면 snapshot.mode 속성의 값을 변경하여 사용자 지정할 수 있습니다.
Debezium MySQL 커넥터는 스냅샷을 수행할 때 데이터의 일관성을 보장하기 위해 잠금 메커니즘을 사용할 수 있으며, 이 잠금의 정도를 설정할 수 있습니다. 이를 통해 데이터베이스의 읽기/쓰기 작업에 영향을 최소화하면서 일관된 스냅샷을 생성할 수 있습니다.
예로는 글로벌 읽기 잠금(Global Read Lock), 테이블 수준 잠금(Table-Level Locks)가 있습니다.
References
https://debezium.io/documentation/reference/stable/architecture.html
'데이터 엔지니어링' 카테고리의 다른 글
Iceberg 에 대해 알아보자 | Architecture 및 도입 이유 (0) | 2024.08.23 |
---|---|
Flink - checkpoint 옵션 및 재시작 전략 (0) | 2024.08.19 |
Flink - State, Checkpointing and Fault Tolerance 상세 내용 (0) | 2024.08.19 |
Flink Memory에 대해 알아보자 (0) | 2024.05.21 |
Flink DataSources 정의 및 구성 요소 (0) | 2024.05.20 |
댓글