SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Athena - 2 (Athena 기반 일일 리포트 생성 DAG)

dev-lee 2026. 4. 20. 16:14

S3 CSV를 Athena 외부 테이블로 매핑하고, result(Pass/Fail) 기준 집계 리포트를 CTAS로 Parquet 테이블로 생성하는 DAG


전체 아키텍처

Task1 (CREATE EXTERNAL TABLE)
    → Task1_2 (S3 Clean)
        → Task2 (DROP TABLE)
            → Task3 (CTAS 집계)

S3에 있는 시험 결과 CSV를 Athena 테이블로 매핑하고, result(Pass/Fail) 기준으로 집계한 리포트 테이블을 Parquet 포맷으로 생성하는 구조.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
  • AthenaOperator — Athena에 SQL 쿼리를 실행하는 Operator. DDL(테이블 생성/삭제)과 CTAS 모두 이걸로 수행
  • S3DeleteObjectsOperator — S3 특정 경로의 객체를 삭제. CTAS 재실행 시 HIVE_PATH_ALREADY_EXISTS 방지용

2. 환경 변수 정의

BUCKET_NAME      = 'de-ai-04-827913617635-ap-northeast-2-an'
ATHENA_DB_NAME   = 'de-ai-04-an2-glue-db'
SRC_TABLE        = 's3_exam_csv_tbl'
TARGET_TABLE     = 'daily_report_tbl'

S3_CSV_LOC       = f's3://{BUCKET_NAME}/csvs/'
S3_TARGET_LOC    = f's3://{BUCKET_NAME}/athena/tbl/{TARGET_TABLE}/'
S3_QUERY_LOG_LOC = f's3://{BUCKET_NAME}/athena/query_logs/'
  • SRC_TABLE — CSV 원본 데이터를 매핑할 외부 테이블명
  • TARGET_TABLE — CTAS로 생성할 집계 결과 테이블명
  • S3_CSV_LOC — 원본 CSV가 저장된 S3 경로. CREATE EXTERNAL TABLE의 LOCATION으로 사용
  • S3_TARGET_LOC — CTAS 결과 Parquet 파일이 저장될 S3 경로
  • S3_QUERY_LOG_LOC — Athena 쿼리 실행 로그가 저장되는 경로

3. DAG 정의

with DAG(
    dag_id            = "10_aws_athena_query",
    description       = "Athena 기반 일일 리포트 생성",
    default_args      = {
        'owner'       : 'de_2team_manager',
        'retries'     : 1,
        'retry_delay' : timedelta(minutes=1)
    },
    schedule_interval = '@daily',
    start_date        = datetime(2026, 4, 17),
    catchup           = False,
    tags              = ['aws', 's3', 'athena', 'report']
) as dag:
  • schedule_interval = '@daily' — 하루 한 번 실행. 테스트 시에는 수동 트리거
  • catchup = False — 과거 미실행분 소급 실행 안 함

4. Task 정의

Task1 — S3 CSV 기반 외부 테이블 생성

create_src_query = f'''
    CREATE EXTERNAL TABLE IF NOT EXISTS {SRC_TABLE} (
        id         int,
        name       string,
        score      int,
        created_at string,
        result     string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION '{S3_CSV_LOC}'
    TBLPROPERTIES ('skip.header.line.count'='1')
'''
Task1 = AthenaOperator(
    task_id         = 'create_src_table',
    query           = create_src_query,
    database        = ATHENA_DB_NAME,
    output_location = S3_QUERY_LOG_LOC,
    aws_conn_id     = 'aws_default'
)
  • CREATE EXTERNAL TABLE — S3 파일을 직접 복사하지 않고 해당 경로를 참조하는 테이블 생성. 데이터는 S3에 그대로 있고 Athena가 읽기만 함
  • IF NOT EXISTS — 이미 존재하면 스킵하여 멱등성 보장
  • FIELDS TERMINATED BY ',' — CSV 구분자 지정
  • skip.header.line.count = 1 — CSV 첫 번째 행(헤더)을 건너뜀
  • Hive DDL 문법 사용 (CREATE EXTERNAL TABLE은 Hive 파서 기반)

Task1_2 — S3 Clean

Task1_2 = S3DeleteObjectsOperator(
    task_id     = 'clean_s3_target',
    bucket      = BUCKET_NAME,
    prefix      = f'athena/tbl/{TARGET_TABLE}/',
    aws_conn_id = 'aws_default'
)
  • 이전 CTAS 실행으로 남아있는 Parquet 파일 삭제
  • 이걸 안 하면 CTAS가 HIVE_PATH_ALREADY_EXISTS 에러로 실패

Task2 — 기존 분석 테이블 DROP

Task2 = AthenaOperator(
    task_id         = 'drop_report_table',
    query           = f'DROP TABLE IF EXISTS `{ATHENA_DB_NAME}`.{TARGET_TABLE}',
    database        = ATHENA_DB_NAME,
    output_location = S3_QUERY_LOG_LOC,
    aws_conn_id     = 'aws_default'
)
  • Glue 카탈로그에서 테이블 메타데이터 삭제
  • DROP TABLE은 S3 파일을 삭제하지 않으므로 Task1_2와 함께 사용해야 완전한 초기화
  • DB명에 하이픈이 포함되어 있어 백틱(`)으로 감쌈 (Hive DDL 파서 규칙)

Task3 — CTAS로 집계 리포트 생성

report_query = f'''
    CREATE TABLE {TARGET_TABLE}
    WITH (
        format            = 'PARQUET',
        external_location = '{S3_TARGET_LOC}'
    )
    AS
    SELECT result,
           count(*)      AS cnt,
           avg(score)    AS avg_score,
           min(score)    AS min_score,
           max(score)    AS max_score
    FROM {SRC_TABLE}
    GROUP BY result
'''
Task3 = AthenaOperator(
    task_id         = 'create_report_table',
    query           = report_query,
    database        = ATHENA_DB_NAME,
    output_location = S3_QUERY_LOG_LOC,
    aws_conn_id     = 'aws_default'
)
  • CTAS (Create Table As Select) — SELECT 결과를 새 테이블로 생성
  • format = 'PARQUET' — 열 기반 압축 포맷으로 저장. CSV 대비 쿼리 성능 및 용량 우수
  • GROUP BY result — Pass/Fail 기준으로 그룹화하여 건수, 평균, 최소, 최대 점수 집계
  • CTAS는 Presto(Trino) 엔진 사용. Hive DDL과 달리 하이픈 포함 식별자는 큰따옴표(")로 감싸야 함 (여기서는 DB명을 직접 쓰지 않으므로 해당 없음)

5. 의존성

Task1 >> Task1_2 >> Task2 >> Task3

 

순서 Task 역할 멱등성 보장 방식
1_2 clean_s3_target S3 Parquet 파일 삭제 매번 삭제
2 drop_report_table Glue 메타데이터 삭제 IF EXISTS
3 create_report_table 집계 → Parquet 테이블 생성 Clean 후 재생성
1 create_src_table CSV → 외부 테이블 매핑 IF NOT EXISTS

S3 파일 삭제(Task1_2) + 테이블 메타 삭제(Task2) + CTAS 재생성(Task3)으로 매 실행마다 동일한 결과를 보장하는 멱등성 구조