회사에서 작년 연말 성과 작성이 일찍 끝나서, 그간 못해보거나 해보고 싶은 일들을 해볼 시간적 여유가 생겼다.
그래서 FlinkCDC로 S3에 Delta 테이블을 다이렉트 Sink 하는 토이 프로젝트를 진행해 보았다.ㅋㅎㅎㅋ
[Githbub 링크] : 조금 더 자세한 코드 및 세부 기록
1. 프로젝트 배경: "표준을 넘어 효율을 탐구하다"
Why Flink & CDC?
현재 우리 팀은 Databricks 기반의 레이크하우스 환경을 운영 중이며, NoSQL(MongoDB) 파이프라인은 Source → Kafka → Spark Streaming → Delta Lake라는 표준화된 아키텍처를 따르고 있습니다. 이 구조는 안정성과 확장성 측면에서 검증된 훌륭한 아키텍처입니다.
하지만 CDC 파이프라인 과제를 기획하면서 한 가지 의문이 들었습니다. "단순히 데이터를 적재(Ingestion)하는 목적을 위해, 항상 이 무거운 미들웨어들을 모두 거쳐야 할까?"
기존 방식은 각 계층이 서로 다른 책임(버퍼, 가공, 적재)을 나누어 가지지만, 그만큼 파이프라인의 복잡도가 높고 운영 이슈 발생 시 대응 범위가 넓다는 트레이드오프가 존재했습니다.
이에 군살을 뺀 경량화된 파이프라인(Lean Pipeline)을 실험해보기로 했습니다. Kafka와 Spark Streaming을 거치지 않고, Apache Flink CDC를 통해 RDB 로그를 실시간으로 캡처하여 Delta Lake로 직결하는 구조입니다. 이는 수집과 가공을 하나의 스트리밍 레벨로 압축할 수 있는지 검증하는 도전적인 과제였습니다.
목표
- Pipeline Simplification: 복잡한 단계를 줄여 운영 포인트를 최소화한다.
- Direct Ingestion: MySQL(RDS) → Flink → Delta Lake(S3)로 직결 경량화 구조 구현.
- Scalability: 단순 적재를 넘어, 향후 가공 로직(ETL)까지 Flink SQL 하나로 확장 가능한지 가능성 타진.
환경 구성
- Infra: AWS EKS (Kubernetes), S3, RDS (MySQL 8.x)
- Platform: Databricks, Apache Flink 1.20
- 제약사항 : 실제 운영 DB가 아닌 PoC용 RDS를 대상으로 테스트 진행.
2. 초기 아키텍처 설계
가장 이상적으로 생각했던 단순한 구조는 다음과 같습니다.
- Source: MySQL CDC Connector (Debezium 기반)
- Process: Apache Flink (SQL Gateway)
- Sink: Delta Lake Connector (Direct Upsert)
[초기 구상] MySQL (Insert/Update/Delete) ➡ Flink ➡ S3 Delta Table (Upsert 반영)
하지만 이 단순한 구조를 구현하는 과정에서 현실적인 오픈소스 생태계의 한계와 기술적 난관을 마주하게 됩니다.
3. Trouble Shooting: 만났던 이슈들과 해결 과정
Issue 1. 의존성 지옥
Flink를 Docker/EKS(K8s) 환경에 직접 구성하면서 ClassNotFoundException과 403 Access Denied 에러가 끊임없이 발생했다.
- 원인: Flink는 기본적으로 '가벼운(Lightweight)' 상태로 배포됩니다. S3, Hadoop, Delta Lake, Parquet 등 필요한 라이브러리는 사용자가 직접 버전 호환성(Matrix)을 맞춰 주입해야 합니다. 특히 parquet-hadoop-bundle이나 scala-library 같은 히든 의존성을 찾는 것이 가장 큰 난관이었습니다.
- 해결:
- 공식 문서를 넘어 에러 로그를 역추적하여 필요한 JAR 리스트 도출.
- Delta Lake S3 적재를 위한 flink-s3-fs-hadoop과 Delta Standalone 라이브러리 조합 최적화.
- 최종적으로 모든 의존성이 포함된 Custom Dockerfile을 완성하여 인프라를 코드화(IaC) 함.
Issue 2. "Sink doesn't support update/delete changes"
가장 치명적이었던 논리적 에러로, CDC 소스는 UPDATE/DELETE 이벤트를 보내는데, Flink SQL용 Delta Connector가 이를 받아주지 못하고 에러를 뱉어냈습니다.
- 원인: Flink SQL Delta Connector(v3.0 이하)는 현재 Append-Only(INSERT)만 지원하며, MERGE나 UPSERT를 SQL 레벨에서 지원하지 않음.
- 해결:
- 전략 변경 (Pivot):
- 도구의 한계를 인정하고 아키텍처를 수정했습니다.
- Flink는 무리하게 데이터를 합치지(Merge) 않고, "배달부" 역할에 집중하기로 했습니다.
- 모든 변경사항(+I, -U, +U, -D)을 처리하지 않고, Append Only 로그 형태로 그대로 적재합니다.
- Technical Fix:
- Flink 1.20 + MySQL CDC 3.4로 버전을 업그레이드하고 아래 핵심 옵션을 적용했습니다.
- 'scan.read-changelog-as-append-only.enabled' = 'true'
- 이 옵션을 통해 Update/Delete 이벤트도 Delta Sink가 이해할 수 있는 INSERT 형태의 로그로 변환되어 안정적으로 적재되기 시작했습니다.
- 전략 변경 (Pivot):
4. 아키텍처 재설계: Hybrid ST Layer
Flink에서 무리하게 Upsert를 수행하려는 시도를 멈추고, Medallion Architecture(Bronze/Silver/Gold)의 정석을 따르기로 했습니다. 전통적인 DW의 ST(Staging) 개념을 현대적인 Lakehouse 아키텍처에 맞춰 재정의했습니다.
전체적인 아키텍처는 같으나 물리적인 Merge(Upsert) 대신, Append Only 로그 적재 후 논리적인 뷰(View)로 최신 상태를 제공하는 전략을 택했습니다.
1단계: Flink 적재 (Bronze Layer, Append-Only)
- 역할 : 변경 로그의 무결성 보장 및 파티셔닝(optional) 적재
- 데이터브릭스 테이블 생성 (properties 생략)
-- flinkSQL -> s3 delta insert
INSERT INTO c_delta.sink.users
SELECT
op_ts,
row_kind,
user_key,
name,
address,
age,
updated_at,
DATE_FORMAT(
(CASE
WHEN op_ts <= TIMESTAMP '1971-01-01 00:00:00' THEN updated_at
ELSE op_ts
END) + INTERVAL '9' HOUR,
'yyyy-MM-dd'
) as op_date
FROM default_catalog.source.users_cdc;
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.<table>
USING DELTA
LOCATION 's3a://<bucket-name>/<prefix>
2단계: Databricks 서빙 (Silver Layer)
- 역할 : 최신 상태 데이터(MST) 제공
- 구현 : 물리적인 테이블을 만들지 않고 Logical View 테이블 생성 -> 비용 절감 효과
- Logical View: RANK() 윈도우 함수를 사용하여 물리적 적재 없이 실시간으로 최신 상태(MST)를 보여주는 View 생성.
-- Databricks Master View
%sql
CREATE OR REPLACE VIEW <catalog>.<schema>.<view_table_mst> AS
WITH ranked_users AS (
SELECT
*,
-- [핵심 로직]
-- 1. 유저 키(PK)별로 그룹핑
-- 2. 시간(op_ts) 내림차순 정렬 (최신이 위로)
-- 3. [중요] 만약 시간이 0.001초까지 똑같다면?
-- => '+U'(수정 후), '+I'(생성)가 '-U', '-D'보다 우선순위를 갖도록 정렬
ROW_NUMBER() OVER (
PARTITION BY user_key
ORDER BY op_ts DESC,
CASE
WHEN row_kind IN ('+I', '+U') THEN 1
ELSE 2
END ASC
) as rn
FROM <catalog>.<schema>.<table> -- bronze table
)
SELECT
user_key,
name,
address,
age,
updated_at,
op_ts as last_synced_at,
op_date -- partitions column
FROM ranked_users
-- [필터링] 최신 레코드(rn=1)가 '삭제(-D)'거나 '수정 전 데이터(-U)'라면 결과에서 제외
WHERE rn = 1
AND row_kind IN ('+I', '+U');
5. 마치며: 얻은 것과 남은 과제
성과
- 파이프라인 구조 단순화:
- Kafka → Spark로 분리돼 있던 스트리밍 단계를 Flink 단일 파이프라인으로 통합했습니다.
- 이를 통해 데이터의 이동 경로(Hop)를 줄이고 컴포넌트 간 의존성을 최소화했습니다.
- 비용 효율적인 구조:
- 단순 적재를 위해 고비용의 Spark Streaming Job을 24시간 띄우는 대신, 가벼운 Flink(on K8s)로 적재하고 조회 시점에 View를 활용하는 Cost-Effective한 구조를 완성했습니다.
- 운영 포인트의 전환 :
- 로그 레이어 → 잡 중심 관측
- 단일 적재(Delta) 목적에 맞춰, 운영 관측의 중심을 Kafka의 로그 레이어 지표(lag/리밸런싱 등)에서 Flink 단일 잡의 실행 품질 지표(체크포인트·백프레셔·리소스)로 전환했습니다. 그 결과 장애 발생 시, 확인 범위가 좁아져 원인 추적과 대응 루틴을 잡 중심으로 표준화할 수 있게 됩니다.
과제
- 추후 남은 과제는 팀 블로그에 작성해두었고 여기선 생략합니다.
Kafka vs Flink 운영 포인트 차이
Kafka는 “중앙 로그(데이터 보관/전달)” 운영이 핵심
- 운영의 중심 대상이 '데이터 로그'입니다. Kafka는 분산 commit log로 설계되어 토픽/파티션에 데이터를 쌓고, 컨슈머가 읽을 위치(오프셋)를 제어합니다.
- 그래서 운영 포인트도 자연스럽게
- Retention(보관 기간/용량), 디스크/브로커 스케일, 파티션 설계
- 컨슈머 그룹 리밸런싱/lag 관리 같은 “읽기 쪽 변동성”
쪽으로 모입니다. (리밸런싱 자체가 Kafka 운영에서 중요한 주제라는 건 업체 문서에서도 반복적으로 다룸)
정리: Kafka를 둔다는 건 “파이프라인 중간에 재처리 가능한 로그 레이어를 운영한다”는 선택에 가깝습니다.
Flink는 “잡(파이프라인) 운영”이 핵심
- Flink는 운영의 중심 대상이 '실행 중인 스트리밍 잡' 입니다. 잡이 소스→변환→싱크를 책임지고, 장애 복구는 체크포인트/상태 복구로 달성합니다.
- 그래서 운영 포인트는
- 체크포인트 주기/성공률/지연, 백프레셔(backpressure)
- 잡의 parallelism/리소스(슬롯/태스크매니저) 튜닝
- 업그레이드/배포 시 세이브포인트 전략
같은 “파이프라인 실행 품질(SLO)”로 모입니다.
정리: FlinkCDC는 “중간 로그 레이어 대신, 실행 잡의 안정성에 운영 역량을 집중한다”는 선택입니다.
'Development > Data Engineering' 카테고리의 다른 글
| ETL이란? (0) | 2022.07.08 |
|---|---|
| tweepy 로 트위터 API V2 스트리밍 하기 (1) | 2022.05.25 |
| [Apache Flink] 플링크 v1.15 TableEnviroment execute() 메서드 삭제 (0) | 2022.05.12 |
| [Docker] M1 맥 도커 confluentinc/cp-kafka 대체 이미지 (0) | 2022.04.28 |
| [Apache Spark] 아파치 스파크 RDD란? (0) | 2022.04.19 |
댓글