SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Athena - 1 (Ahtena + Airflow)

dev-lee 2026. 4. 17. 17:41

Airflow Athena CTAS ETL DAG 

S3에 있는 CSV 데이터를 Athena로 조회하고, 90점 이상 학생만 필터링하여 Parquet 포맷으로 재저장하는 CTAS 기반 ETL 파이프라인


전체 파이프라인 흐름

[S3 Clean] → [Table Drop] → [CTAS 쿼리 실행] → [완료 감지]

매 실행 시 기존 리소스를 초기화(멱등성 보장) 한 뒤, Athena의 CTAS(Create Table As Select)로 데이터를 필터링하여 Parquet 포맷으로 저장하는 구조.


1. 모듈 임포트

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
import logging
  • AthenaOperator — Athena에 SQL 쿼리를 실행하는 오퍼레이터. DDL(CREATE/DROP)과 DML(SELECT/INSERT) 모두 지원
  • AthenaSensor — Athena 쿼리 실행이 완료되었는지 감지하는 센서. CTAS처럼 오래 걸리는 작업의 완료 여부 확인용
  • S3DeleteObjectsOperator — S3 경로 하위의 객체를 일괄 삭제. Clean 작업용

2. 환경변수

BUCKET_NAME      = 'de-ai-04-827913617635-ap-northeast-2-an'
ATHENA_DB_NAME   = 'de-ai-04-an2-glue-db'
SRC_TABLE        = 's3_exam_csv'
TARGET_TABLE     = 'pass_student'

S3_TARGET_LOC    = f's3://{BUCKET_NAME}/athena/tbl/{TARGET_TABLE}/'
S3_QUERY_LOG_LOC = f's3://{BUCKET_NAME}/athena/query_logs/'
  • ATHENA_DB_NAME — Glue Catalog의 데이터베이스 이름. Athena가 참조하는 메타스토어
  • SRC_TABLE — 원본 CSV 데이터가 등록된 테이블명
  • TARGET_TABLE — CTAS 결과가 저장될 테이블명 (pass_student)
  • S3_TARGET_LOC — CTAS 결과물(Parquet 파일)이 저장되는 실제 S3 경로
  • S3_QUERY_LOG_LOC — Athena 쿼리 실행 로그/결과 메타데이터 저장 위치

3. DAG 정의

with DAG(
    dag_id='10_aws_athena_ctas_etl',
    description='Athena ctas 작업',
    default_args={
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    },
    schedule_interval=None,
    start_date=datetime(2026, 4, 17),
    catchup=False,
    tags=['aws', 's3', 'athena', 'ctas']
) as dag:
  • schedule_interval=None — 수동 트리거 또는 외부 트리거로만 실행
  • catchup=False — 과거 미실행 DAG 소급 실행 안 함

4. Task 정의

Task 1 — S3 Clean (S3DeleteObjectsOperator)

Task1 = S3DeleteObjectsOperator(
    task_id='clean_s3_target',
    bucket=BUCKET_NAME,
    prefix=f'athena/tbl/{TARGET_TABLE}/',
    aws_conn_id='aws_default'
)
  • prefix — 해당 경로 하위의 모든 객체를 삭제. keys 대신 prefix를 쓰면 하위 전체 정리
  • CTAS 결과 Parquet 파일이 쌓여있는 경로를 비우는 작업
  • 멱등성 보장의 핵심 — 매 실행마다 깨끗한 상태에서 시작하여 데이터 중복을 방지

Task 2 — Table Drop (AthenaOperator)

Task2 = AthenaOperator(
    task_id='drop_table',
    query=f'drop table if exists {ATHENA_DB_NAME}.{TARGET_TABLE}',
    database=ATHENA_DB_NAME,
    output_location=S3_QUERY_LOG_LOC,
    aws_conn_id='aws_default'
)
  • DROP TABLE IF EXISTS — 기존 테이블이 있으면 제거, 없어도 에러 없이 통과
  • output_location — 쿼리 실행 결과/메타데이터가 저장될 S3 경로 (Athena 필수 옵션)
  • Task 1이 S3 파일을 지우고, Task 2가 Glue Catalog의 테이블 메타데이터를 지우는 2단 Clean 구조

Task 1과 Task 2가 모두 필요한 이유:

Task  대상  역할
Task 1 S3 파일(실물 데이터) Parquet 파일 삭제
Task 2 Glue Catalog(메타정보) 테이블 정의 삭제

Athena 테이블은 S3 파일 + Glue 메타데이터의 조합이므로 둘 다 정리해야 완전한 초기화가 됨.

Task 3 — CTAS 실행 (AthenaOperator)

query = f'''
    create table {ATHENA_DB_NAME}.{TARGET_TABLE}
    with (
        format = 'PARQUET',
        parquet_compression = 'GZIP',
        external_location = '{S3_TARGET_LOC}'
    )
    as
    select id, name, score, created_at
    from {ATHENA_DB_NAME}.{SRC_TABLE}
    where score >= 90
    order by score desc
'''

Task3 = AthenaOperator(
    task_id='create_table_format_parquet',
    query=query,
    database=ATHENA_DB_NAME,
    output_location=S3_QUERY_LOG_LOC,
    aws_conn_id='aws_default',
    do_xcom_push=True
)

CTAS(Create Table As Select) 구조:

  • 쿼리 결과를 그대로 새 테이블로 저장하는 Athena 기능
  • WITH 절에서 저장 포맷과 위치를 지정

쿼리 옵션:

  • format = 'PARQUET' — 열 기반 컬럼 포맷. CSV 대비 압축률과 쿼리 속도 우수
  • parquet_compression = 'GZIP' — Parquet 내부 압축 방식
  • external_location — 결과 Parquet 파일이 저장될 S3 경로. 문자열이므로 따옴표 필수

필터링 로직:

  • 90점 이상 학생만 추출 (where score >= 90)
  • 점수 내림차순 정렬 (order by score desc)

do_xcom_push=True:

  • 쿼리 실행 ID(query_execution_id)를 xCom에 저장
  • Task 4(Sensor)에서 이 ID를 받아 완료 여부를 추적

Task 4 — 완료 감지 (AthenaSensor)

Task4 = AthenaSensor(
    task_id='sensor',
    query_execution_id="{{ task_instance.xcom_pull(task_ids='create_table_format_parquet') }}",
    poke_interval=10,
    timeout=600,
    aws_conn_id='aws_default'
)
  • query_execution_id — Task 3에서 xCom으로 전달된 쿼리 ID. Jinja 템플릿으로 런타임에 추출
  • poke_interval=10 — 10초마다 쿼리 상태 확인
  • timeout=600 — 최대 10분까지 대기. 이후에도 완료 안 되면 실패 처리
  • CTAS는 데이터 양에 따라 수 초~수 분 소요될 수 있어 Sensor로 완료를 기다림

AthenaOperator만으로 완료를 기다리지 않고 왜 Sensor를 추가하는가?

  • AthenaOperator는 쿼리를 제출하고 완료를 기다리지만, 비동기 패턴으로 Operator가 바로 끝나고 Sensor가 별도로 완료를 체크하는 방식이 워커 리소스를 덜 점유함
  • 실무에서는 Operator에 wait_for_completion=False로 설정하여 Sensor와 조합하는 패턴이 많이 사용됨

5. 의존성

Task1 >> Task2 >> Task3 >> Task4
[S3 파일 삭제] → [테이블 삭제] → [CTAS 실행] → [완료 대기]

요약 정리

항목  내용
DAG 유형 Athena CTAS 기반 ETL
처리 흐름 S3/테이블 Clean → CTAS 실행 → 완료 감지
출력 포맷 Parquet + GZIP 압축
필터 조건 시험 점수 90점 이상
멱등성 S3 파일 삭제 + 테이블 DROP의 2단 Clean으로 보장
비동기 처리 Operator + Sensor 조합으로 워커 리소스 효율화

 

Athena의 CTAS를 활용하여 CSV 원본 데이터를 필터링·Parquet 변환까지 한 번에 처리하는 ETL 파이프라인으로, S3와 Glue Catalog 양쪽을 초기화하여 멱등성을 보장하고 Sensor로 CTAS 완료를 추적하는 구조