S3 CSV를 Athena 외부 테이블로 매핑하고, result(Pass/Fail) 기준 집계 리포트를 CTAS로 Parquet 테이블로 생성하는 DAG
전체 아키텍처
Task1 (CREATE EXTERNAL TABLE)
→ Task1_2 (S3 Clean)
→ Task2 (DROP TABLE)
→ Task3 (CTAS 집계)
S3에 있는 시험 결과 CSV를 Athena 테이블로 매핑하고, result(Pass/Fail) 기준으로 집계한 리포트 테이블을 Parquet 포맷으로 생성하는 구조.
1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
- AthenaOperator — Athena에 SQL 쿼리를 실행하는 Operator. DDL(테이블 생성/삭제)과 CTAS 모두 이걸로 수행
- S3DeleteObjectsOperator — S3 특정 경로의 객체를 삭제. CTAS 재실행 시 HIVE_PATH_ALREADY_EXISTS 방지용
2. 환경 변수 정의
BUCKET_NAME = 'de-ai-04-827913617635-ap-northeast-2-an'
ATHENA_DB_NAME = 'de-ai-04-an2-glue-db'
SRC_TABLE = 's3_exam_csv_tbl'
TARGET_TABLE = 'daily_report_tbl'
S3_CSV_LOC = f's3://{BUCKET_NAME}/csvs/'
S3_TARGET_LOC = f's3://{BUCKET_NAME}/athena/tbl/{TARGET_TABLE}/'
S3_QUERY_LOG_LOC = f's3://{BUCKET_NAME}/athena/query_logs/'
- SRC_TABLE — CSV 원본 데이터를 매핑할 외부 테이블명
- TARGET_TABLE — CTAS로 생성할 집계 결과 테이블명
- S3_CSV_LOC — 원본 CSV가 저장된 S3 경로. CREATE EXTERNAL TABLE의 LOCATION으로 사용
- S3_TARGET_LOC — CTAS 결과 Parquet 파일이 저장될 S3 경로
- S3_QUERY_LOG_LOC — Athena 쿼리 실행 로그가 저장되는 경로
3. DAG 정의
with DAG(
dag_id = "10_aws_athena_query",
description = "Athena 기반 일일 리포트 생성",
default_args = {
'owner' : 'de_2team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@daily',
start_date = datetime(2026, 4, 17),
catchup = False,
tags = ['aws', 's3', 'athena', 'report']
) as dag:
- schedule_interval = '@daily' — 하루 한 번 실행. 테스트 시에는 수동 트리거
- catchup = False — 과거 미실행분 소급 실행 안 함
4. Task 정의
Task1 — S3 CSV 기반 외부 테이블 생성
create_src_query = f'''
CREATE EXTERNAL TABLE IF NOT EXISTS {SRC_TABLE} (
id int,
name string,
score int,
created_at string,
result string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '{S3_CSV_LOC}'
TBLPROPERTIES ('skip.header.line.count'='1')
'''
Task1 = AthenaOperator(
task_id = 'create_src_table',
query = create_src_query,
database = ATHENA_DB_NAME,
output_location = S3_QUERY_LOG_LOC,
aws_conn_id = 'aws_default'
)
- CREATE EXTERNAL TABLE — S3 파일을 직접 복사하지 않고 해당 경로를 참조하는 테이블 생성. 데이터는 S3에 그대로 있고 Athena가 읽기만 함
- IF NOT EXISTS — 이미 존재하면 스킵하여 멱등성 보장
- FIELDS TERMINATED BY ',' — CSV 구분자 지정
- skip.header.line.count = 1 — CSV 첫 번째 행(헤더)을 건너뜀
- Hive DDL 문법 사용 (CREATE EXTERNAL TABLE은 Hive 파서 기반)
Task1_2 — S3 Clean
Task1_2 = S3DeleteObjectsOperator(
task_id = 'clean_s3_target',
bucket = BUCKET_NAME,
prefix = f'athena/tbl/{TARGET_TABLE}/',
aws_conn_id = 'aws_default'
)
- 이전 CTAS 실행으로 남아있는 Parquet 파일 삭제
- 이걸 안 하면 CTAS가 HIVE_PATH_ALREADY_EXISTS 에러로 실패
Task2 — 기존 분석 테이블 DROP
Task2 = AthenaOperator(
task_id = 'drop_report_table',
query = f'DROP TABLE IF EXISTS `{ATHENA_DB_NAME}`.{TARGET_TABLE}',
database = ATHENA_DB_NAME,
output_location = S3_QUERY_LOG_LOC,
aws_conn_id = 'aws_default'
)
- Glue 카탈로그에서 테이블 메타데이터 삭제
- DROP TABLE은 S3 파일을 삭제하지 않으므로 Task1_2와 함께 사용해야 완전한 초기화
- DB명에 하이픈이 포함되어 있어 백틱(`)으로 감쌈 (Hive DDL 파서 규칙)
Task3 — CTAS로 집계 리포트 생성
report_query = f'''
CREATE TABLE {TARGET_TABLE}
WITH (
format = 'PARQUET',
external_location = '{S3_TARGET_LOC}'
)
AS
SELECT result,
count(*) AS cnt,
avg(score) AS avg_score,
min(score) AS min_score,
max(score) AS max_score
FROM {SRC_TABLE}
GROUP BY result
'''
Task3 = AthenaOperator(
task_id = 'create_report_table',
query = report_query,
database = ATHENA_DB_NAME,
output_location = S3_QUERY_LOG_LOC,
aws_conn_id = 'aws_default'
)
- CTAS (Create Table As Select) — SELECT 결과를 새 테이블로 생성
- format = 'PARQUET' — 열 기반 압축 포맷으로 저장. CSV 대비 쿼리 성능 및 용량 우수
- GROUP BY result — Pass/Fail 기준으로 그룹화하여 건수, 평균, 최소, 최대 점수 집계
- CTAS는 Presto(Trino) 엔진 사용. Hive DDL과 달리 하이픈 포함 식별자는 큰따옴표(")로 감싸야 함 (여기서는 DB명을 직접 쓰지 않으므로 해당 없음)
5. 의존성
Task1 >> Task1_2 >> Task2 >> Task3
| 순서 | Task | 역할 | 멱등성 보장 방식 |
| 1_2 | clean_s3_target | S3 Parquet 파일 삭제 | 매번 삭제 |
| 2 | drop_report_table | Glue 메타데이터 삭제 | IF EXISTS |
| 3 | create_report_table | 집계 → Parquet 테이블 생성 | Clean 후 재생성 |
| 1 | create_src_table | CSV → 외부 테이블 매핑 | IF NOT EXISTS |
S3 파일 삭제(Task1_2) + 테이블 메타 삭제(Task2) + CTAS 재생성(Task3)으로 매 실행마다 동일한 결과를 보장하는 멱등성 구조
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Medallion Architecture - 1 — Bronze (0) | 2026.04.21 |
|---|---|
| Athena 정리 — 개념, SQL 실습, Airflow 연동 (0) | 2026.04.20 |
| Athena - 1 (Ahtena + Airflow) (0) | 2026.04.17 |
| Kinesis - 2(KDS+Flink+Firehose+S3) (0) | 2026.04.16 |
| Kinesis - 1 (KDS+Firehose+S3) (0) | 2026.04.16 |