SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 9 (이벤트 기반 파이프라인)

dev-lee 2026. 4. 15. 16:32

Airflow S3 Producer-Consumer DAG 코드 분석 — 이벤트 기반 파이프라인

Producer가 S3에 데이터를 업로드하면 Consumer가 감지하여 처리 → 삭제하는 이벤트 드리븐 파이프라인 구조


전체 아키텍처

[Producer DAG]                          [Consumer DAG]
CSV 생성 → S3 업로드  ──→  S3 감시(Sensor) → 읽기/처리 → 삭제
                        ↑
                  S3: income/sensor_data.csv

두 DAG가 S3의 income/sensor_data.csv 경로를 매개로 연결됨. Producer가 파일을 올리면 Consumer의 센서가 감지하여 후속 처리를 수행하는 구조.


Part 1. Producer DAG — 09_aws_s3_producer


1. 모듈 임포트

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
import logging
  • BashOperator — 쉘 명령어로 CSV 파일을 생성하는 용도
  • LocalFilesystemToS3Operator — 로컬 파일시스템의 파일을 S3로 업로드하는 전용 오퍼레이터. Hook보다 선언적이고 간결함
  • S3Hook — 현재 코드에서는 미사용. Consumer DAG에서 활용됨
  • PythonOperator — 현재 코드에서는 미사용. import만 되어있는 상태

2. 환경변수

BUCKET_NAME = "de-ai-04-827913617635-ap-northeast-2-an"
FILE_NAME   = 'sensor_data.csv'
S3_KEY      = f'income/{FILE_NAME}'
LOCAL_PATH  = f'/opt/airflow/dags/data/{FILE_NAME}'
  • BUCKET_NAME — 대상 S3 버킷. 네이밍: {팀}-{계정ID}-{리전}-an
  • S3_KEY — 버킷 내 업로드 경로. income/sensor_data.csv로 지정됨. Consumer DAG의 센서가 이 경로를 감시함
  • LOCAL_PATH — Airflow 컨테이너 내부의 파일 경로. /opt/airflow/dags/data/ 하위에 CSV를 생성함

3. DAG 정의

with DAG(
    dag_id='09_aws_s3_producer',
    schedule_interval=None,
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['aws', 's3', 'producer']
) as dag:
  • schedule_interval=None — 자동 스케줄 없음. 수동 트리거(UI에서 직접 실행) 또는 외부 트리거로만 작동함
  • Producer는 데이터가 필요한 시점에만 실행하면 되므로 스케줄이 불필요함
  • Consumer와의 차이점: Consumer는 @daily로 센서를 활성화해야 하지만, Producer는 None으로 필요할 때만 실행

4. Task 정의

4-1. BashOperator — CSV 더미 데이터 생성

task_create_dummy_data_csv = BashOperator(
    task_id='task_create_dummy_data_csv',
    bash_command=f'echo -e "id,timestamp,value\\n1,$(date),100\\n2,$(date),500" > {LOCAL_PATH}'
)
  • echo -e\n을 개행 문자로 해석하도록 하는 옵션
  • $(date) — 쉘에서 현재 시간을 삽입함. 실행 시점의 타임스탬프가 기록됨
  • \\n — f-string 내에서 \n이 파이썬 단에서 먼저 해석되지 않도록 이스케이프 처리함
  • > {LOCAL_PATH} — 결과를 파일로 리다이렉트하여 CSV 파일 생성

생성되는 CSV 구조:

id,timestamp,value
1,Tue Apr 15 10:18:24 KST 2026,100
2,Tue Apr 15 10:18:24 KST 2026,500

4-2. LocalFilesystemToS3Operator — S3 업로드

task_upload_to_s3 = LocalFilesystemToS3Operator(
    task_id='task_upload_to_s3',
    filename=LOCAL_PATH,
    dest_key=S3_KEY,
    dest_bucket=BUCKET_NAME,
    aws_conn_id='aws_default',
    replace=True
)
  • filename — 업로드할 로컬 파일 경로
  • dest_key — S3 버킷 내 저장 위치. income/sensor_data.csv
  • dest_bucket — 대상 버킷명
  • aws_conn_id='aws_default' — Airflow Connection에 등록된 AWS 인증 정보 사용
  • replace=True — 동일 키에 파일이 이미 존재하면 덮어씀. 파일은 항상 1개만 유지됨

5. 의존성

task_create_dummy_data_csv >> task_upload_to_s3
[CSV 생성] → [S3 업로드]

CSV가 정상 생성된 후에만 업로드가 실행되는 순차 구조.


Part 2. Consumer DAG — 09_aws_s3_consumer


1. 모듈 임포트

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
import logging
  • S3KeySensor — S3 버킷 내 특정 키(파일)의 존재 여부를 주기적으로 확인하는 센서. 파일이 감지될 때까지 대기함
  • S3Hook — S3에 직접 연결하여 파일을 읽는 유틸리티. Operator보다 가벼운 작업에 적합함
  • S3DeleteObjectsOperator — 처리 완료 후 S3 객체를 삭제하여 감시 경로를 초기화하는 용도

2. 환경변수

BUCKET_NAME = "de-ai-04-827913617635-ap-northeast-2-an"
FILE_NAME   = 'sensor_data.csv'
S3_KEY      = f'income/{FILE_NAME}'
  • Producer와 동일한 버킷, 동일한 키를 참조함. 이 경로가 두 DAG를 연결하는 매개점
  • Consumer는 로컬 경로(LOCAL_PATH)가 불필요함. S3에서 직접 읽기 때문

3. 콜백 함수 — _reading_data

def _reading_data(**kwargs):
    hook = S3Hook(aws_conn_id='aws_default')
    data = hook.read_key(key=S3_KEY, bucket_name=BUCKET_NAME)
    logging.info('-- 로그 출력 시작 --')
    logging.info(data)
    logging.info('-- 로그 출력 종료 --')
  • S3Hook(aws_conn_id='aws_default') — Airflow Connection의 AWS 인증 정보로 S3에 접속
  • hook.read_key() — 파일을 다운로드하지 않고 메모리에서 문자열로 직접 읽음
  • **kwargs — Airflow context 수신용. 현재는 미사용이나, ti(TaskInstance) 등을 통한 xCom 연계 시 활용 가능
  • 현재는 로그 출력만 하지만, 실제 운영에서는 ETL, AI 추론, DB 적재 등의 비즈니스 로직이 들어가는 위치

4. DAG 정의

with DAG(
    dag_id='09_aws_s3_consumer',
    schedule_interval='@daily',
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['aws', 's3', 'consumer']
) as dag:
  • schedule_interval='@daily' — 센서가 작동하려면 DAG 자체가 스케줄에 의해 활성화되어야 함. None으로 하면 센서가 작동하지 않음
  • Producer(None)와의 차이: Consumer는 항상 대기 상태여야 하므로 스케줄이 필수
  • 실질적 트리거는 스케줄이 아닌 센서의 파일 감지에 의해 결정됨

5. Task 정의

5-1. S3KeySensor — 감시자

task_waiting_trigger = S3KeySensor(
    task_id='waiting_trigger',
    bucket_key=S3_KEY,
    bucket_name=BUCKET_NAME,
    aws_conn_id='aws_default',
    mode='reschedule',
    poke_interval=10,
    timeout=60*10
)
  • bucket_key / bucket_name — Producer가 업로드하는 income/sensor_data.csv 경로를 감시
  • mode='reschedule' — 대기 중 워커 슬롯을 반납하여 리소스를 효율적으로 사용함
  • poke_interval=10 — 10초 간격으로 파일 존재 여부를 체크
  • timeout=600 — 10분 동안 파일이 감지되지 않으면 Task 실패 처리

mode 비교:

항목  poke  reschedule
리소스 효율 낮음 높음
적합 상황 짧은 대기 (수초) 긴 대기 (수분~수시간)
대기 방식 워커 슬롯 점유하며 sleep 슬롯 반납 후 재스케줄

5-2. PythonOperator — 비즈니스 처리

task_reading_data = PythonOperator(
    task_id='reading_data',
    python_callable=_reading_data
)
  • 센서가 파일을 감지한 후 실행됨
  • _reading_data 함수를 호출하여 S3에서 파일 내용을 읽고 로그로 출력

5-3. S3DeleteObjectsOperator — 정리

task_delete_data_or_backup = S3DeleteObjectsOperator(
    task_id='task_delete_data_or_backup',
    bucket=BUCKET_NAME,
    keys=[S3_KEY],
    aws_conn_id='aws_default'
)
  • 처리 완료된 파일을 삭제하여 감시 경로를 초기 상태로 되돌림
  • 삭제하지 않으면 다음 DAG 실행 시 센서가 즉시 통과해버림
  • 실제 운영에서는 삭제 전 archive/ 경로로 백업 이동하는 로직을 추가하는 것이 일반적

6. 의존성

task_waiting_trigger >> task_reading_data >> task_delete_data_or_backup
[감시/대기] → [읽기/처리] → [삭제/정리]

Producer vs Consumer 비교

 

항목  Producer  Consumer
DAG ID 09_aws_s3_producer 09_aws_s3_consumer
역할 데이터 생성 + S3 업로드 S3 감시 + 읽기 + 삭제
schedule_interval None (수동 트리거) @daily (센서 활성화 필요)
핵심 오퍼레이터 BashOperator, LocalFilesystemToS3Operator S3KeySensor, PythonOperator, S3DeleteObjectsOperator
S3 경로 업로드 대상 감시 대상
연결점 income/sensor_data.csv에 파일 생성 동일 경로를 센서로 감시
Task 수 2개 3개
AWS 컴포넌트 transfers (업로드) hooks (읽기) + sensors (감시) + operators (삭제)

Producer가 S3에 파일을 업로드하면 Consumer의 S3KeySensor가 감지 → Hook으로 읽기 → 처리 후 삭제하여 경로를 초기화하는 이벤트 드리븐 Producer-Consumer 파이프라인 구조