회사에서 작년 연말 성과 작성이 일찍 끝나서, 그간 못해보거나 해보고 싶은 일들을 해볼 시간적 여유가 생겼다.
그래서 FlinkCDC로 S3에 Delta 테이블을 다이렉트 Sink 하는 토이 프로젝트를 진행해 보았다.ㅋㅎㅎㅋ
1. 프로젝트 배경: "표준을 넘어 효율을 탐구하다"
Why Flink & CDC?
현재 우리 팀은 Databricks 기반의 레이크하우스 환경을 운영 중이며, NoSQL(MongoDB) 파이프라인은 Source → Kafka → Spark Streaming → Delta Lake라는 표준화된 아키텍처를 따르고 있습니다. 이 구조는 안정성과 확장성 측면에서 검증된 훌륭한 아키텍처입니다.
하지만 CDC 파이프라인 과제를 기획하면서 한 가지 의문이 들었습니다. "단순히 데이터를 적재(Ingestion)하는 목적을 위해, 항상 이 무거운 미들웨어들을 모두 거쳐야 할까?"
기존 방식은 각 계층이 서로 다른 책임(버퍼, 가공, 적재)을 나누어 가지지만, 그만큼 파이프라인의 복잡도가 높고 운영 이슈 발생 시 대응 범위가 넓다는 트레이드오프가 존재했습니다.
이에 군살을 뺀 경량화된 파이프라인(Lean Pipeline)을 실험해보기로 했습니다. Kafka와 Spark Streaming을 거치지 않고, Apache Flink CDC를 통해 RDB 로그를 실시간으로 캡처하여 Delta Lake로 직결하는 구조입니다. 이는 수집과 가공을 하나의 스트리밍 레벨로 압축할 수 있는지 검증하는 도전적인 과제였습니다.
목표
- Pipeline Simplification: 복잡한 단계를 줄여 운영 포인트를 최소화한다.
- Direct Ingestion: MySQL(RDS) → Flink → Delta Lake(S3)로 직결 경량화 구조 구현.
- Scalability: 단순 적재를 넘어, 향후 가공 로직(ETL)까지 Flink SQL 하나로 확장 가능한지 가능성 타진.
환경 구성
- Infra: AWS EKS (Kubernetes), S3, RDS (MySQL 8.x)
- Platform: Databricks, Apache Flink 1.20
- 제약사항 : 실제 운영 DB가 아닌 PoC용 RDS를 대상으로 테스트 진행.
2. 초기 아키텍처 설계
가장 이상적으로 생각했던 단순한 구조는 다음과 같습니다.
- Source: MySQL CDC Connector (Debezium 기반)
- Process: Apache Flink (SQL Gateway)
- Sink: Delta Lake Connector (Direct Upsert)
[초기 구상] MySQL (Insert/Update/Delete) ➡ Flink ➡ S3 Delta Table (Upsert 반영)
하지만 이 단순한 구조를 구현하는 과정에서 현실적인 오픈소스 생태계의 한계와 기술적 난관을 마주하게 됩니다.
3. Trouble Shooting: 만났던 이슈들과 해결 과정
Issue 1. 의존성 지옥
Flink를 Docker/EKS(K8s) 환경에 직접 구성하면서 ClassNotFoundException과 403 Access Denied 에러가 끊임없이 발생했다.
- 원인: Flink는 기본적으로 '가벼운(Lightweight)' 상태로 배포됩니다. S3, Hadoop, Delta Lake, Parquet 등 필요한 라이브러리는 사용자가 직접 버전 호환성(Matrix)을 맞춰 주입해야 합니다. 특히 parquet-hadoop-bundle이나 scala-library 같은 히든 의존성을 찾는 것이 가장 큰 난관이었습니다.
- 해결:
- 공식 문서를 넘어 에러 로그를 역추적하여 필요한 JAR 리스트 도출.
- Delta Lake S3 적재를 위한 flink-s3-fs-hadoop과 Delta Standalone 라이브러리 조합 최적화.
- 최종적으로 모든 의존성이 포함된 Custom Dockerfile을 완성하여 인프라를 코드화(IaC) 함.
Issue 2. "Sink doesn't support update/delete changes"
가장 치명적이었던 논리적 에러로, CDC 소스는 UPDATE/DELETE 이벤트를 보내는데, Flink SQL용 Delta Connector가 이를 받아주지 못하고 에러를 뱉어냈습니다.
- 원인: Flink SQL Delta Connector(v3.0 이하)는 현재 Append-Only(INSERT)만 지원하며, MERGE나 UPSERT를 SQL 레벨에서 지원하지 않음.
- 해결:
- 전략 변경 (Pivot):
- 도구의 한계를 인정하고 아키텍처를 수정했습니다.
- Flink는 무리하게 데이터를 합치지(Merge) 않고, "배달부" 역할에 집중하기로 했습니다.
- 모든 변경사항(+I, -U, +U, -D)을 처리하지 않고, Append Only 로그 형태로 그대로 적재합니다.
- Technical Fix:
- Flink 1.20 + MySQL CDC 3.4로 버전을 업그레이드하고 아래 핵심 옵션을 적용했습니다.
- 'scan.read-changelog-as-append-only.enabled' = 'true'
- 이 옵션을 통해 Update/Delete 이벤트도 Delta Sink가 이해할 수 있는 INSERT 형태의 로그로 변환되어 안정적으로 적재되기 시작했습니다.
- 전략 변경 (Pivot):
4. 아키텍처 재설계: Hybrid ST Layer
Flink에서 무리하게 Upsert를 수행하려는 시도를 멈추고, Medallion Architecture(Bronze/Silver/Gold)의 정석을 따르기로 했습니다. 전통적인 DW의 ST(Staging) 개념을 현대적인 Lakehouse 아키텍처에 맞춰 재정의했습니다.
전체적인 아키텍처는 같으나 물리적인 Merge(Upsert) 대신, Append Only 로그 적재 후 논리적인 뷰(View)로 최신 상태를 제공하는 전략을 택했습니다.
1단계: Flink 적재 (Bronze Layer, Append-Only)
- 역할 : 변경 로그의 무결성 보장 및 파티셔닝(optional) 적재
- 데이터브릭스 테이블 생성 (properties 생략)
-- flinkSQL -> s3 delta insert
INSERT INTO c_delta.sink.users
SELECT
op_ts,
row_kind,
user_key,
name,
address,
age,
updated_at,
DATE_FORMAT(
(CASE
WHEN op_ts <= TIMESTAMP '1971-01-01 00:00:00' THEN updated_at
ELSE op_ts
END) + INTERVAL '9' HOUR,
'yyyy-MM-dd'
) as op_date
FROM default_catalog.source.users_cdc;
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.<table>
USING DELTA
LOCATION 's3a://<bucket-name>/<prefix>
2단계: Databricks 서빙 (Silver Layer)
- 역할 : 최신 상태 데이터(MST) 제공
- 구현 : 물리적인 테이블을 만들지 않고 Logical View 테이블 생성 -> 비용 절감 효과
- Logical View: RANK() 윈도우 함수를 사용하여 물리적 적재 없이 실시간으로 최신 상태(MST)를 보여주는 View 생성.
-- Databricks Master View
%sql
CREATE OR REPLACE VIEW <catalog>.<schema>.<view_table_mst> AS
WITH ranked_users AS (
SELECT
*,
-- [핵심 로직]
-- 1. 유저 키(PK)별로 그룹핑
-- 2. 시간(op_ts) 내림차순 정렬 (최신이 위로)
-- 3. [중요] 만약 시간이 0.001초까지 똑같다면?
-- => '+U'(수정 후), '+I'(생성)가 '-U', '-D'보다 우선순위를 갖도록 정렬
ROW_NUMBER() OVER (
PARTITION BY user_key
ORDER BY op_ts DESC,
CASE
WHEN row_kind IN ('+I', '+U') THEN 1
ELSE 2
END ASC
) as rn
FROM <catalog>.<schema>.<table> -- bronze table
)
SELECT
user_key,
name,
address,
age,
updated_at,
op_ts as last_synced_at,
op_date -- partitions column
FROM ranked_users
-- [필터링] 최신 레코드(rn=1)가 '삭제(-D)'거나 '수정 전 데이터(-U)'라면 결과에서 제외
WHERE rn = 1
AND row_kind IN ('+I', '+U');
5. 마치며: 얻은 것과 남은 과제
성과
- 파이프라인 구조 단순화:
- Kafka → Spark로 분리돼 있던 스트리밍 단계를 Flink 단일 파이프라인으로 통합했습니다.
- 이를 통해 데이터의 이동 경로(Hop)를 줄이고 컴포넌트 간 의존성을 최소화했습니다.
- 비용 효율적인 구조:
- 단순 적재를 위해 고비용의 Spark Streaming Job을 24시간 띄우는 대신, 가벼운 Flink(on K8s)로 적재하고 조회 시점에 View를 활용하는 Cost-Effective한 구조를 완성했습니다.
- 운영 포인트의 전환 :
- 로그 레이어 → 잡 중심 관측
- 단일 적재(Delta) 목적에 맞춰, 운영 관측의 중심을 Kafka의 로그 레이어 지표(lag/리밸런싱 등)에서 Flink 단일 잡의 실행 품질 지표(체크포인트·백프레셔·리소스)로 전환했습니다. 그 결과 장애 발생 시, 확인 범위가 좁아져 원인 추적과 대응 루틴을 잡 중심으로 표준화할 수 있게 됩니다.
과제
- 추후 남은 과제는 팀 블로그에 작성해두었고 여기선 생략합니다.
Kafka vs Flink 운영 포인트 차이
Kafka는 “중앙 로그(데이터 보관/전달)” 운영이 핵심
- 운영의 중심 대상이 '데이터 로그'입니다. Kafka는 분산 commit log로 설계되어 토픽/파티션에 데이터를 쌓고, 컨슈머가 읽을 위치(오프셋)를 제어합니다.
- 그래서 운영 포인트도 자연스럽게
- Retention(보관 기간/용량), 디스크/브로커 스케일, 파티션 설계
- 컨슈머 그룹 리밸런싱/lag 관리 같은 “읽기 쪽 변동성”
쪽으로 모입니다. (리밸런싱 자체가 Kafka 운영에서 중요한 주제라는 건 업체 문서에서도 반복적으로 다룸)
정리: Kafka를 둔다는 건 “파이프라인 중간에 재처리 가능한 로그 레이어를 운영한다”는 선택에 가깝습니다.
Flink는 “잡(파이프라인) 운영”이 핵심
- Flink는 운영의 중심 대상이 '실행 중인 스트리밍 잡' 입니다. 잡이 소스→변환→싱크를 책임지고, 장애 복구는 체크포인트/상태 복구로 달성합니다.
- 그래서 운영 포인트는
- 체크포인트 주기/성공률/지연, 백프레셔(backpressure)
- 잡의 parallelism/리소스(슬롯/태스크매니저) 튜닝
- 업그레이드/배포 시 세이브포인트 전략
같은 “파이프라인 실행 품질(SLO)”로 모입니다.
정리: FlinkCDC는 “중간 로그 레이어 대신, 실행 잡의 안정성에 운영 역량을 집중한다”는 선택입니다.
'Development > Data Engineering' 카테고리의 다른 글
| ETL이란? (0) | 2022.07.08 |
|---|---|
| tweepy 로 트위터 API V2 스트리밍 하기 (1) | 2022.05.25 |
| [Apache Flink] 플링크 v1.15 TableEnviroment execute() 메서드 삭제 (0) | 2022.05.12 |
| [Docker] M1 맥 도커 confluentinc/cp-kafka 대체 이미지 (0) | 2022.04.28 |
| [Apache Spark] 아파치 스파크 RDD란? (0) | 2022.04.19 |
댓글