Airflow DAG + Athena SQL로 Bronze의 중첩 데이터를 Flatten/정제하여 Silver 테이블에 적재하는 두 가지 방식 — CTAS(전체 교체)와 증분(누적 INSERT)
CTAS 기반 DAG — 11_aws_ma_silver.py
매 실행마다 Silver 테이블을 삭제 후 CTAS로 재생성하여, 항상 최신 스냅샷 상태를 유지하는 방식.
전체 구조
DAG: 11_medallion_bronze_to_silver_ctas
스케줄: 10 * * * * (매시 10분)
├─ Task 1: drop_silver_tbl → Silver 테이블 DROP
└─ Task 2: ctas_silver → CTAS로 Bronze SELECT → Silver 테이블 생성
↓
Silver S3 적재 (Parquet/SNAPPY)
Bronze 수집이 매시 정각에 시작되고 Firehose 버퍼(최대 180초) 포함 약 3분 소요 → 보수적으로 매시 10분에 Silver DAG 가동.
1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
- AthenaOperator — Airflow에서 Athena SQL을 직접 실행하는 오퍼레이터. query 파라미터에 SQL을 넘기면 Athena API를 호출하여 쿼리 실행 후 완료까지 대기
2. 환경 변수 정의
DATABASE_BRONZE = 'de_ai_04_ma_bronze_db'
DATABASE_SILVER = 'de_ai_04_ma_sliver_db'
SILVER_S3_PATH = 's3://de-ai-04-827913617635-ap-northeast-2-an/medallion/silver/'
ATHENA_RESULTS = 's3://de-ai-04-827913617635-ap-northeast-2-an/athena-results/'
SILVER_TBL_NAME = 'sales_silver_tbl'
- DATABASE_BRONZE / DATABASE_SILVER — Glue Data Catalog의 데이터베이스명. Bronze에서 읽고 Silver에 씀
- SILVER_S3_PATH — CTAS 결과 데이터가 Parquet로 저장되는 S3 경로
- ATHENA_RESULTS — Athena 쿼리 실행 히스토리/메타데이터 저장 경로. 쿼리 결과 CSV도 여기에 저장됨
3. DAG 정의
with DAG(
dag_id = "11_medallion_bronze_to_silver_ctas",
description = "Athena ctas 작업",
default_args= {
'owner' : 'de_2team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '10 * * * *',
start_date = datetime(2026,4,17),
catchup = False,
tags = ['aws', 'medallion', 'silver', 'athena', 'ctas']
) as dag:
- *schedule_interval = '10 * * * ' — cron 표기(분 시 일 월 요일). 매시 10분에 실행
- catchup = False — start_date 이후 밀린 과거 스케줄을 소급 실행하지 않음. CTAS 방식은 매번 전체 교체이므로 과거 보정 불필요
- retries = 1 — 실패 시 1회 재시도, 1분 간격
4. Task 정의
Task 1 — Silver 테이블 DROP
drop_silver_task = AthenaOperator(
task_id = 'drop_silver_tbl',
query='drop table if exists {{params.database_silver}}.{{params.tbl_name}};',
database=DATABASE_SILVER,
output_location=ATHENA_RESULTS,
params={'database_silver':DATABASE_SILVER, 'tbl_name':SILVER_TBL_NAME}
)
- DROP TABLE IF EXISTS — 기존 Silver 테이블 삭제. CTAS는 이미 존재하는 테이블에 실행 불가하므로 선행 삭제 필수
- Jinja 템플릿 — {{params.database_silver}} 형태로 params dict의 값을 SQL에 동적 주입
Task 2 — CTAS로 Silver 테이블 생성
ctas_silver_task = AthenaOperator(
task_id = 'ctas_silver',
query = '''
CREATE TABLE IF NOT EXISTS {{ params.database_silver }}.{{ params.tbl_name }}
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = '{{ params.silver_path }}',
partitioned_by = ARRAY['dt','hr']
) AS
SELECT
event_id,
event_time as event_timestamp,
data.user_id,
data.item_id,
data.price,
data.qty,
data.store_id,
source_ip,
user_agent,
(data.price * data.qty) as total_price,
CAST(year||'-' ||month||'-' ||day as VARCHAR) as dt,
hour as hr
FROM {{ params.database_bronze }}.raw_bronze_tbl
WHERE year = '{{ execution_date.strftime("%Y") }}'
and month = '{{ execution_date.strftime("%m") }}'
and day = '{{ execution_date.strftime("%d") }}'
and hour = '10'
''',
database=DATABASE_SILVER,
output_location=ATHENA_RESULTS,
params= {
'database_bronze': DATABASE_BRONZE,
'database_silver': DATABASE_SILVER,
'tbl_name': SILVER_TBL_NAME,
'silver_path': SILVER_S3_PATH
}
)
- CTAS(Create Table As Select) — SELECT 결과를 그대로 새 테이블로 생성. 별도 INSERT 없이 테이블 구조 + 데이터가 한 번에 세팅됨
- format = 'PARQUET', parquet_compression = 'SNAPPY' — 결과를 Parquet 포맷 + SNAPPY 압축으로 저장. 컬럼 기반 포맷이라 분석 쿼리 성능에 유리
- external_location — CTAS 결과 데이터가 저장되는 S3 경로 지정
- partitioned_by = ARRAY['dt','hr'] — SELECT의 마지막 컬럼인 dt, hr을 파티션 컬럼으로 지정. CTAS에서 파티션 컬럼은 반드시 SELECT 마지막에 위치해야 함
- data.user_id, data.item_id ... — Bronze의 중첩 struct에서 dot notation으로 필드 추출 (Flatten)
- (data.price * data.qty) as total_price — 파생 변수 생성. Bronze에는 없던 매출 총액 컬럼
- CAST(year||'-'||month||'-'||day as VARCHAR) as dt — year/month/day 파티션 컬럼 3개를 하나의 날짜 문자열(2026-04-17)로 결합
- hour as hr — 컬럼명 변경
- execution_date.strftime("%Y") — Airflow 실행 시점의 날짜를 Jinja 템플릿으로 주입. 해당 시간대의 Bronze 데이터만 필터링
Silver에서 처리되는 변환 정리:
| 변환 유형 | 내용 |
| Flatten | data.user_id, data.item_id, data.price, data.qty, data.store_id 추출 |
| 파생 변수 | (price * qty) → total_price, year-month-day 결합 → dt |
| 컬럼명 변경 | event_time → event_timestamp, hour → hr |
5. 의존성 정의
drop_silver_task >> ctas_silver_task
- DROP 완료 후 CTAS 실행. 순서가 보장되어야 CTAS 실패 방지
증분 적재 DAG — 11_aws_ma_silver_increment.py
테이블은 최초 1회만 생성하고, 매시 10분마다 해당 시간대 데이터만 INSERT하여 누적 적재하는 방식.
전체 구조
DAG: 11_medallion_bronze_to_silver_increment
스케줄: 10 * * * * (매시 10분)
├─ Task 1: create_silver_table_if_not_exists → 테이블 없으면 생성 (스키마만)
└─ Task 2: insert_bronze_to_silver → 해당 시간대 Bronze 데이터 INSERT
↓
Silver S3 누적 적재 (Parquet/SNAPPY)
1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
CTAS DAG와 동일 구성.
2. 환경 변수 정의
DATABASE_BRONZE = 'de_ai_04_ma_bronze_db'
DATABASE_SILVER = 'de_ai_04_ma_sliver_db'
SILVER_S3_PATH = 's3://de-ai-04-827913617635-ap-northeast-2-an/medallion/silver/'
ATHENA_RESULTS = 's3://de-ai-04-827913617635-ap-northeast-2-an/athena-results/'
SILVER_TBL_NAME = 'sales_silver_increment_tbl'
- SILVER_TBL_NAME — CTAS DAG와 다른 테이블명(sales_silver_increment_tbl). 두 DAG가 동시에 운영될 경우 충돌 방지
3. DAG 정의
with DAG(
dag_id = "11_medallion_bronze_to_silver_increment",
description = "Athena 증분 작업",
default_args= {
'owner' : 'de_2team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '10 * * * *',
start_date = datetime(2026,4,17),
catchup = False,
tags = ['aws', 'medallion', 'silver', 'athena', 'increment']
) as dag:
- CTAS DAG와 동일한 스케줄(10 * * * *), 동일 start_date
- tags — 'increment' 태그로 CTAS DAG와 구분
4. Task 정의
Task 1 — Silver 테이블 생성 (스키마만)
create_silver_table = AthenaOperator(
task_id='create_silver_table_if_not_exists',
query="""
CREATE EXTERNAL TABLE IF NOT EXISTS {{ params.database_silver }}.{{params.tbl_name}} (
event_id string,
event_timestamp timestamp,
user_id string,
item_id string,
price int,
qty int,
total_price int,
store_id string,
source_ip string,
user_agent string
)
PARTITIONED BY (dt string, hr string)
STORED AS PARQUET
LOCATION '{{ params.silver_path }}'
TBLPROPERTIES ('parquet.compress'='SNAPPY');
""",
params={
'database_silver': DATABASE_SILVER,
'silver_path': SILVER_S3_PATH,
'tbl_name': SILVER_TBL_NAME
},
database=DATABASE_SILVER,
output_location=ATHENA_RESULTS
)
- CREATE EXTERNAL TABLE IF NOT EXISTS — 테이블이 없을 때만 생성. 이미 존재하면 스킵 → 매 실행마다 안전하게 호출 가능
- CTAS와의 차이 — 여기서는 데이터 없이 스키마(컬럼 구조)만 정의. 데이터는 Task 2에서 INSERT
- PARTITIONED BY (dt, hr) — CTAS와 동일한 파티션 구조
- STORED AS PARQUET + SNAPPY — CTAS의 WITH 절과 동일한 저장 포맷이지만, 외부 테이블 DDL 문법으로 표현
Task 2 — 시간대별 Bronze 데이터 INSERT
insert_silver_data = AthenaOperator(
task_id='insert_bronze_to_silver',
query="""
INSERT INTO {{ params.database_silver }}.{{params.tbl_name}}
SELECT
event_id,
event_time as event_timestamp,
data.user_id,
data.item_id,
data.price,
data.qty,
(data.price * data.qty) as total_price,
data.store_id,
source_ip,
user_agent,
CAST(year || '-' || month || '-' || day AS VARCHAR) as dt,
hour as hr
FROM {{ params.database_bronze }}.raw_bronze_tbl
WHERE year = '{{ execution_date.format("YYYY") }}'
AND month = '{{ execution_date.format("MM") }}'
AND day = '{{ execution_date.format("DD") }}'
AND hour = '{{ execution_date.format("HH") }}';
""",
params={
'database_bronze': DATABASE_BRONZE,
'database_silver': DATABASE_SILVER,
'tbl_name': SILVER_TBL_NAME
},
database=DATABASE_SILVER,
output_location=ATHENA_RESULTS
)
- INSERT INTO ... SELECT — 기존 테이블에 데이터 추가. CTAS와 달리 테이블을 삭제하지 않고 누적
- Flatten / 파생 변수 / 컬럼명 변경 — CTAS DAG와 동일한 변환 로직 적용
- execution_date.format("YYYY") — CTAS에서는 strftime("%Y") 사용, 여기서는 Pendulum의 format("YYYY") 사용. 둘 다 Airflow 실행 시점의 날짜를 Jinja로 주입하는 동일 목적
- hour = '{{ execution_date.format("HH") }}' — CTAS DAG와의 핵심 차이. CTAS는 hour = '10'으로 고정했지만, 증분 DAG는 execution_date의 시간값을 동적으로 사용 → 매시간 해당 시간대 데이터만 정확히 추출
5. 의존성 정의
create_silver_table >> insert_silver_data
- 테이블 존재 보장 후 INSERT 실행
두 DAG 비교
| 구분 | CTAS DAG | 증분 DAG |
| dag_id | 11_medallion_bronze_to_silver_ctas | 11_medallion_bronze_to_silver_increment |
| 테이블명 | sales_silver_tbl | sales_silver_increment_tbl |
| Task 흐름 | DROP → CTAS | CREATE IF NOT EXISTS → INSERT |
| 테이블 생성 | 매번 삭제 후 재생성 | 최초 1회만 (IF NOT EXISTS) |
| 데이터 적재 | 전체 교체 (스냅샷) | 누적 INSERT |
| hour 필터 | hour = '10' 고정 | execution_date.format("HH") 동적 |
| 날짜 템플릿 | strftime("%Y") (Python) | format("YYYY") (Pendulum) |
| 적합한 경우 | 항상 최신 상태만 필요 | 이력 누적 / 시계열 분석 필요 |
CTAS DAG는 매 실행마다 테이블을 DROP 후 재생성하여 최신 스냅샷을 유지하고, 증분 DAG는 테이블을 1회 생성 후 시간대별 INSERT로 데이터를 누적 적재함. 두 방식 모두 Bronze의 중첩 struct를 Flatten하고 파생 변수(total_price, dt)를 생성하는 동일한 Silver 정제 로직을 수행.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| OpenSearch 개념 정리 (1) | 2026.04.23 |
|---|---|
| Medallion Architecture - 5 — Gold Layer 구현 (0) | 2026.04.22 |
| Medallion Architecture - 3 — Silver Layer (0) | 2026.04.22 |
| Medallion Architecture - 2 — Bronze 코드 구현 (0) | 2026.04.21 |
| Medallion Architecture - 1 — Bronze (0) | 2026.04.21 |