SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Medallion Architecture - 4 — Silver DAG 코드 구현

dev-lee 2026. 4. 22. 16:13

Airflow DAG + Athena SQL로 Bronze의 중첩 데이터를 Flatten/정제하여 Silver 테이블에 적재하는 두 가지 방식 — CTAS(전체 교체)와 증분(누적 INSERT)


CTAS 기반 DAG — 11_aws_ma_silver.py

매 실행마다 Silver 테이블을 삭제 후 CTAS로 재생성하여, 항상 최신 스냅샷 상태를 유지하는 방식.

전체 구조

DAG: 11_medallion_bronze_to_silver_ctas
  스케줄: 10 * * * * (매시 10분)

  ├─ Task 1: drop_silver_tbl        → Silver 테이블 DROP
  └─ Task 2: ctas_silver            → CTAS로 Bronze SELECT → Silver 테이블 생성
                                          ↓
                                    Silver S3 적재 (Parquet/SNAPPY)

Bronze 수집이 매시 정각에 시작되고 Firehose 버퍼(최대 180초) 포함 약 3분 소요 → 보수적으로 매시 10분에 Silver DAG 가동.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
  • AthenaOperator — Airflow에서 Athena SQL을 직접 실행하는 오퍼레이터. query 파라미터에 SQL을 넘기면 Athena API를 호출하여 쿼리 실행 후 완료까지 대기

2. 환경 변수 정의

DATABASE_BRONZE = 'de_ai_04_ma_bronze_db'
DATABASE_SILVER = 'de_ai_04_ma_sliver_db'
SILVER_S3_PATH  = 's3://de-ai-04-827913617635-ap-northeast-2-an/medallion/silver/'
ATHENA_RESULTS  = 's3://de-ai-04-827913617635-ap-northeast-2-an/athena-results/'
SILVER_TBL_NAME = 'sales_silver_tbl'
  • DATABASE_BRONZE / DATABASE_SILVER — Glue Data Catalog의 데이터베이스명. Bronze에서 읽고 Silver에 씀
  • SILVER_S3_PATH — CTAS 결과 데이터가 Parquet로 저장되는 S3 경로
  • ATHENA_RESULTS — Athena 쿼리 실행 히스토리/메타데이터 저장 경로. 쿼리 결과 CSV도 여기에 저장됨

3. DAG 정의

with DAG(
    dag_id      = "11_medallion_bronze_to_silver_ctas", 
    description = "Athena ctas 작업",
    default_args= {
        'owner'             : 'de_2team_manager',        
        'retries'           : 1,
        'retry_delay'       : timedelta(minutes=1)
    },
    schedule_interval = '10 * * * *',
    start_date  = datetime(2026,4,17),     
    catchup     = False,
    tags     = ['aws', 'medallion', 'silver', 'athena', 'ctas']
) as dag:
  • *schedule_interval = '10 * * * ' — cron 표기(분 시 일 월 요일). 매시 10분에 실행
  • catchup = False — start_date 이후 밀린 과거 스케줄을 소급 실행하지 않음. CTAS 방식은 매번 전체 교체이므로 과거 보정 불필요
  • retries = 1 — 실패 시 1회 재시도, 1분 간격

4. Task 정의

Task 1 — Silver 테이블 DROP

drop_silver_task = AthenaOperator(
    task_id = 'drop_silver_tbl',
    query='drop table if exists {{params.database_silver}}.{{params.tbl_name}};',
    database=DATABASE_SILVER,
    output_location=ATHENA_RESULTS,
    params={'database_silver':DATABASE_SILVER, 'tbl_name':SILVER_TBL_NAME}
)
  • DROP TABLE IF EXISTS — 기존 Silver 테이블 삭제. CTAS는 이미 존재하는 테이블에 실행 불가하므로 선행 삭제 필수
  • Jinja 템플릿 — {{params.database_silver}} 형태로 params dict의 값을 SQL에 동적 주입

Task 2 — CTAS로 Silver 테이블 생성

ctas_silver_task = AthenaOperator(
    task_id = 'ctas_silver',
    query = '''
        CREATE TABLE IF NOT EXISTS {{ params.database_silver }}.{{ params.tbl_name }} 
        WITH (
            format = 'PARQUET',
            parquet_compression = 'SNAPPY',
            external_location = '{{ params.silver_path }}',
            partitioned_by = ARRAY['dt','hr']
        ) AS 
        SELECT
            event_id,
            event_time as event_timestamp,
            data.user_id,
            data.item_id,
            data.price,
            data.qty,
            data.store_id,
            source_ip,
            user_agent,
            (data.price * data.qty) as total_price,
            CAST(year||'-' ||month||'-' ||day as VARCHAR) as dt,
            hour as hr
        FROM {{ params.database_bronze }}.raw_bronze_tbl
        WHERE year = '{{ execution_date.strftime("%Y") }}'
            and month = '{{ execution_date.strftime("%m") }}'
            and day = '{{ execution_date.strftime("%d") }}'
            and hour = '10'
    ''',
    database=DATABASE_SILVER,
    output_location=ATHENA_RESULTS,
    params= {
        'database_bronze': DATABASE_BRONZE,
        'database_silver': DATABASE_SILVER,
        'tbl_name': SILVER_TBL_NAME,
        'silver_path': SILVER_S3_PATH
    }
)
  • CTAS(Create Table As Select) — SELECT 결과를 그대로 새 테이블로 생성. 별도 INSERT 없이 테이블 구조 + 데이터가 한 번에 세팅됨
  • format = 'PARQUET', parquet_compression = 'SNAPPY' — 결과를 Parquet 포맷 + SNAPPY 압축으로 저장. 컬럼 기반 포맷이라 분석 쿼리 성능에 유리
  • external_location — CTAS 결과 데이터가 저장되는 S3 경로 지정
  • partitioned_by = ARRAY['dt','hr'] — SELECT의 마지막 컬럼인 dt, hr을 파티션 컬럼으로 지정. CTAS에서 파티션 컬럼은 반드시 SELECT 마지막에 위치해야 함
  • data.user_id, data.item_id ... — Bronze의 중첩 struct에서 dot notation으로 필드 추출 (Flatten)
  • (data.price * data.qty) as total_price — 파생 변수 생성. Bronze에는 없던 매출 총액 컬럼
  • CAST(year||'-'||month||'-'||day as VARCHAR) as dt — year/month/day 파티션 컬럼 3개를 하나의 날짜 문자열(2026-04-17)로 결합
  • hour as hr — 컬럼명 변경
  • execution_date.strftime("%Y") — Airflow 실행 시점의 날짜를 Jinja 템플릿으로 주입. 해당 시간대의 Bronze 데이터만 필터링

Silver에서 처리되는 변환 정리:

변환 유형 내용
Flatten data.user_id, data.item_id, data.price, data.qty, data.store_id 추출
파생 변수 (price * qty) → total_price, year-month-day 결합 → dt
컬럼명 변경 event_time → event_timestamp, hour → hr

5. 의존성 정의

drop_silver_task >> ctas_silver_task
  • DROP 완료 후 CTAS 실행. 순서가 보장되어야 CTAS 실패 방지


증분 적재 DAG — 11_aws_ma_silver_increment.py

테이블은 최초 1회만 생성하고, 매시 10분마다 해당 시간대 데이터만 INSERT하여 누적 적재하는 방식.

전체 구조

DAG: 11_medallion_bronze_to_silver_increment
  스케줄: 10 * * * * (매시 10분)

  ├─ Task 1: create_silver_table_if_not_exists  → 테이블 없으면 생성 (스키마만)
  └─ Task 2: insert_bronze_to_silver            → 해당 시간대 Bronze 데이터 INSERT
                                                       ↓
                                                 Silver S3 누적 적재 (Parquet/SNAPPY)

1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator

CTAS DAG와 동일 구성.


2. 환경 변수 정의

DATABASE_BRONZE = 'de_ai_04_ma_bronze_db'
DATABASE_SILVER = 'de_ai_04_ma_sliver_db'
SILVER_S3_PATH  = 's3://de-ai-04-827913617635-ap-northeast-2-an/medallion/silver/'
ATHENA_RESULTS  = 's3://de-ai-04-827913617635-ap-northeast-2-an/athena-results/'
SILVER_TBL_NAME = 'sales_silver_increment_tbl'
  • SILVER_TBL_NAME — CTAS DAG와 다른 테이블명(sales_silver_increment_tbl). 두 DAG가 동시에 운영될 경우 충돌 방지

3. DAG 정의

with DAG(
    dag_id      = "11_medallion_bronze_to_silver_increment", 
    description = "Athena 증분 작업",
    default_args= {
        'owner'             : 'de_2team_manager',        
        'retries'           : 1,
        'retry_delay'       : timedelta(minutes=1)
    },
    schedule_interval = '10 * * * *',
    start_date  = datetime(2026,4,17),     
    catchup     = False,
    tags     = ['aws', 'medallion', 'silver', 'athena', 'increment']
) as dag:
  • CTAS DAG와 동일한 스케줄(10 * * * *), 동일 start_date
  • tags — 'increment' 태그로 CTAS DAG와 구분

4. Task 정의

Task 1 — Silver 테이블 생성 (스키마만)

create_silver_table = AthenaOperator(
    task_id='create_silver_table_if_not_exists',
    query="""
        CREATE EXTERNAL TABLE IF NOT EXISTS {{ params.database_silver }}.{{params.tbl_name}} (
            event_id string,
            event_timestamp timestamp,
            user_id string,
            item_id string,
            price int,
            qty int,
            total_price int,
            store_id string,
            source_ip string,
            user_agent string
        )
        PARTITIONED BY (dt string, hr string)
        STORED AS PARQUET
        LOCATION '{{ params.silver_path }}'
        TBLPROPERTIES ('parquet.compress'='SNAPPY');
    """,
    params={
        'database_silver': DATABASE_SILVER,
        'silver_path': SILVER_S3_PATH,
        'tbl_name': SILVER_TBL_NAME
    },
    database=DATABASE_SILVER,
    output_location=ATHENA_RESULTS
)
  • CREATE EXTERNAL TABLE IF NOT EXISTS — 테이블이 없을 때만 생성. 이미 존재하면 스킵 → 매 실행마다 안전하게 호출 가능
  • CTAS와의 차이 — 여기서는 데이터 없이 스키마(컬럼 구조)만 정의. 데이터는 Task 2에서 INSERT
  • PARTITIONED BY (dt, hr) — CTAS와 동일한 파티션 구조
  • STORED AS PARQUET + SNAPPY — CTAS의 WITH 절과 동일한 저장 포맷이지만, 외부 테이블 DDL 문법으로 표현

Task 2 — 시간대별 Bronze 데이터 INSERT

insert_silver_data = AthenaOperator(
    task_id='insert_bronze_to_silver',
    query="""
        INSERT INTO {{ params.database_silver }}.{{params.tbl_name}}
        SELECT
            event_id,
            event_time as event_timestamp,
            data.user_id,
            data.item_id,
            data.price,
            data.qty,
            (data.price * data.qty) as total_price,
            data.store_id,
            source_ip,
            user_agent,
            CAST(year || '-' || month || '-' || day AS VARCHAR) as dt,
            hour as hr
        FROM {{ params.database_bronze }}.raw_bronze_tbl
        WHERE year = '{{ execution_date.format("YYYY") }}'
        AND month = '{{ execution_date.format("MM") }}'
        AND day = '{{ execution_date.format("DD") }}'
        AND hour = '{{ execution_date.format("HH") }}';
    """,
    params={
        'database_bronze': DATABASE_BRONZE,
        'database_silver': DATABASE_SILVER,
        'tbl_name': SILVER_TBL_NAME
    },
    database=DATABASE_SILVER,
    output_location=ATHENA_RESULTS
)
  • INSERT INTO ... SELECT — 기존 테이블에 데이터 추가. CTAS와 달리 테이블을 삭제하지 않고 누적
  • Flatten / 파생 변수 / 컬럼명 변경 — CTAS DAG와 동일한 변환 로직 적용
  • execution_date.format("YYYY") — CTAS에서는 strftime("%Y") 사용, 여기서는 Pendulum의 format("YYYY") 사용. 둘 다 Airflow 실행 시점의 날짜를 Jinja로 주입하는 동일 목적
  • hour = '{{ execution_date.format("HH") }}' — CTAS DAG와의 핵심 차이. CTAS는 hour = '10'으로 고정했지만, 증분 DAG는 execution_date의 시간값을 동적으로 사용 → 매시간 해당 시간대 데이터만 정확히 추출

5. 의존성 정의

create_silver_table >> insert_silver_data
  • 테이블 존재 보장 후 INSERT 실행

두 DAG 비교

구분  CTAS DAG 증분 DAG
dag_id 11_medallion_bronze_to_silver_ctas 11_medallion_bronze_to_silver_increment
테이블명 sales_silver_tbl sales_silver_increment_tbl
Task 흐름 DROP → CTAS CREATE IF NOT EXISTS → INSERT
테이블 생성 매번 삭제 후 재생성 최초 1회만 (IF NOT EXISTS)
데이터 적재 전체 교체 (스냅샷) 누적 INSERT
hour 필터 hour = '10' 고정 execution_date.format("HH") 동적
날짜 템플릿 strftime("%Y") (Python) format("YYYY") (Pendulum)
적합한 경우 항상 최신 상태만 필요 이력 누적 / 시계열 분석 필요

CTAS DAG는 매 실행마다 테이블을 DROP 후 재생성하여 최신 스냅샷을 유지하고, 증분 DAG는 테이블을 1회 생성 후 시간대별 INSERT로 데이터를 누적 적재함. 두 방식 모두 Bronze의 중첩 struct를 Flatten하고 파생 변수(total_price, dt)를 생성하는 동일한 Silver 정제 로직을 수행.