Airflow 멀티 DAG 구성 - Extract 단계 코드 분석
이번 글에서는 Airflow에서 하나의 DAG가 다른 DAG를 트리거하는 멀티 DAG 구조를 살펴봅니다. ETL 파이프라인 중 Extract(추출) 단계를 담당하는 DAG 코드를 하나하나 뜯어보겠습니다.
전체 구조
이 코드의 핵심은 데이터를 추출한 뒤, 다음 단계인 Transform DAG를 자동으로 실행시키는 것입니다. ETL을 하나의 DAG에 넣을 수도 있지만, 규모가 커지면 단계별로 DAG를 분리하는 것이 관리와 재사용 측면에서 유리합니다. DAG 간 연결에는 TriggerDagRunOperator를 사용합니다.
1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import logging, json, random, os
- datetime, timedelta → DAG 시작일과 재시도 간격 설정
- PythonOperator → 파이썬 함수를 Task로 실행
- TriggerDagRunOperator → 핵심! 다른 DAG의 실행을 트리거하는 오퍼레이터
- json, random, os → 더미 데이터 생성 및 파일 저장용
2. 데이터 저장 경로 설정
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)
Docker 컨테이너 내부에서 Airflow worker가 접근 가능한 경로입니다. exist_ok=True로 폴더가 이미 있어도 에러 없이 넘어갑니다. 실무에서는 이 로컬 경로 대신 S3 같은 데이터 레이크로 대체됩니다.
3. 콜백 함수 (_extract)
def _extract(**kwargs):
data = [
{
"sensor_id": f"SENSOR_{i+1}",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"temperature": round(random.uniform(20.0, 150.0), 2),
"status": "on",
} for i in range(10)
]
file_path = f"{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json"
with open(file_path, "w") as f:
json.dump(data, f)
logging.info(f'extract 완료한 로그 데이터 {file_path}')
return file_path
스마트 팩토리 온도 센서 데이터를 추출하는 시나리오입니다. 10개 센서의 더미 데이터를 리스트 컴프리헨션으로 생성합니다.
- kwargs['ds_nodash'] → Airflow가 제공하는 매크로 변수로, DAG 실행 날짜를 YYYYMMDD 형태로 반환. 파일명이 sensor_data_20260414.json 같은 형식이 됨
- json.dump(data, f) → 파이썬 객체를 JSON 파일로 직렬화하여 저장
- return file_path → 반환값은 자동으로 xCom에 저장되어 다음 Task에서 꺼내 쓸 수 있음
4. DAG 정의
with DAG(
dag_id='06_multiDAG_extract',
description='ETL 중 Extract 단계 - 온도 센서 데이터 추출',
default_args={
'owner': 'de_2team_manager',
'retries': 1,
'retry_delay': timedelta(minutes=1),
},
schedule_interval='@daily',
start_date=datetime(2026, 2, 25),
catchup=False,
tags=['mysql', 'etl', 'extract']
) as dag:
- dag_id → Airflow 웹 UI에 표시되는 DAG 이름
- retries=1, retry_delay=1분 → 실패 시 1분 후 1번 재시도
- schedule_interval='@daily' → 매일 1회 실행
- catchup=False → 과거 미실행 분을 소급 실행하지 않음
5. Task 정의
5-1. Extract Task
T1_extract = PythonOperator(
task_id='extract',
python_callable=_extract
)
위에서 정의한 _extract 함수를 실행하는 Task입니다. 실행 결과(파일 경로)는 xCom을 통해 자동 저장됩니다.
5-2. Transform DAG 트리거 Task (핵심)
task_trigger_transform_dag_run = TriggerDagRunOperator(
task_id="trigger_transform",
trigger_dag_id="06_multiDAG_transform",
conf={
"json_path": "{{task_instance.xcom_pull(task_ids='extract')}}"
},
reset_dag_run=True,
wait_for_completion=False
)
이 Task가 멀티 DAG의 핵심입니다. 각 파라미터를 살펴보면:
- trigger_dag_id → 실행시킬 대상 DAG의 ID. 여기서는 Transform 단계 DAG를 지정
- conf → 대상 DAG에 전달할 데이터. Jinja 템플릿 {{ }}을 사용하여 이전 Task(extract)의 xCom 값(파일 경로)을 꺼내서 전달
- reset_dag_run=True → 같은 날짜에 이미 실행된 DAG Run이 있어도 초기화 후 재실행 허용
- wait_for_completion=False → 트리거만 하고 대기 없이 바로 종료 (비동기 처리). True로 하면 Transform DAG가 끝날 때까지 이 Task가 대기함
6. Task 의존성 설정
T1_extract >> task_trigger_transform_dag_run
Extract Task가 완료된 후 Transform DAG 트리거가 실행됩니다. 실행 흐름을 정리하면:
- _extract 함수 실행 → 센서 더미 데이터 생성 및 JSON 파일 저장
- 파일 경로가 xCom에 저장됨
- TriggerDagRunOperator가 xCom에서 파일 경로를 꺼내 Transform DAG에 전달하며 트리거
- Transform DAG가 해당 파일 경로를 받아 데이터 변환 작업 수행
정리
구성 요소 역할
| PythonOperator | 파이썬 함수를 Task로 실행 |
| TriggerDagRunOperator | 다른 DAG를 트리거 |
| xCom | Task 간, DAG 간 데이터 전달 |
| conf + Jinja 템플릿 | 트리거 시 데이터를 전달하는 방법 |
멀티 DAG 구조의 장점은 각 ETL 단계를 독립적으로 관리, 모니터링, 재실행할 수 있다는 점입니다. Extract가 실패하면 Extract만 재실행하면 되고, Transform에 문제가 있으면 Transform DAG만 수정하면 됩니다.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Airflow - 6(멀티 DAG 구성 - Load) (0) | 2026.04.14 |
|---|---|
| Airflow - 5 (멀티 DAG - Transform) (0) | 2026.04.14 |
| Airflow - 3 (1) | 2026.04.09 |
| Airflow - 2 (0) | 2026.04.09 |
| Airflow - 1 (0) | 2026.04.08 |