SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 4 (멀티 DAG 구성 - Extract)

dev-lee 2026. 4. 14. 17:01

Airflow 멀티 DAG 구성 - Extract 단계 코드 분석

이번 글에서는 Airflow에서 하나의 DAG가 다른 DAG를 트리거하는 멀티 DAG 구조를 살펴봅니다. ETL 파이프라인 중 Extract(추출) 단계를 담당하는 DAG 코드를 하나하나 뜯어보겠습니다.


전체 구조

이 코드의 핵심은 데이터를 추출한 뒤, 다음 단계인 Transform DAG를 자동으로 실행시키는 것입니다. ETL을 하나의 DAG에 넣을 수도 있지만, 규모가 커지면 단계별로 DAG를 분리하는 것이 관리와 재사용 측면에서 유리합니다. DAG 간 연결에는 TriggerDagRunOperator를 사용합니다.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import logging, json, random, os
  • datetime, timedelta → DAG 시작일과 재시도 간격 설정
  • PythonOperator → 파이썬 함수를 Task로 실행
  • TriggerDagRunOperator → 핵심! 다른 DAG의 실행을 트리거하는 오퍼레이터
  • json, random, os → 더미 데이터 생성 및 파일 저장용

2. 데이터 저장 경로 설정

DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)

Docker 컨테이너 내부에서 Airflow worker가 접근 가능한 경로입니다. exist_ok=True로 폴더가 이미 있어도 에러 없이 넘어갑니다. 실무에서는 이 로컬 경로 대신 S3 같은 데이터 레이크로 대체됩니다.


3. 콜백 함수 (_extract)

def _extract(**kwargs):
    data = [
        {
            "sensor_id": f"SENSOR_{i+1}",
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "temperature": round(random.uniform(20.0, 150.0), 2),
            "status": "on",
        } for i in range(10)
    ]
    file_path = f"{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json"
    with open(file_path, "w") as f:
        json.dump(data, f)
    logging.info(f'extract 완료한 로그 데이터 {file_path}')
    return file_path

스마트 팩토리 온도 센서 데이터를 추출하는 시나리오입니다. 10개 센서의 더미 데이터를 리스트 컴프리헨션으로 생성합니다.

  • kwargs['ds_nodash'] → Airflow가 제공하는 매크로 변수로, DAG 실행 날짜를 YYYYMMDD 형태로 반환. 파일명이 sensor_data_20260414.json 같은 형식이 됨
  • json.dump(data, f) → 파이썬 객체를 JSON 파일로 직렬화하여 저장
  • return file_path → 반환값은 자동으로 xCom에 저장되어 다음 Task에서 꺼내 쓸 수 있음

4. DAG 정의

with DAG(
    dag_id='06_multiDAG_extract',
    description='ETL 중 Extract 단계 - 온도 센서 데이터 추출',
    default_args={
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval='@daily',
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['mysql', 'etl', 'extract']
) as dag:
  • dag_id → Airflow 웹 UI에 표시되는 DAG 이름
  • retries=1, retry_delay=1분 → 실패 시 1분 후 1번 재시도
  • schedule_interval='@daily' → 매일 1회 실행
  • catchup=False → 과거 미실행 분을 소급 실행하지 않음

5. Task 정의

5-1. Extract Task

T1_extract = PythonOperator(
    task_id='extract',
    python_callable=_extract
)

위에서 정의한 _extract 함수를 실행하는 Task입니다. 실행 결과(파일 경로)는 xCom을 통해 자동 저장됩니다.

5-2. Transform DAG 트리거 Task (핵심)

task_trigger_transform_dag_run = TriggerDagRunOperator(
    task_id="trigger_transform",
    trigger_dag_id="06_multiDAG_transform",
    conf={
        "json_path": "{{task_instance.xcom_pull(task_ids='extract')}}"
    },
    reset_dag_run=True,
    wait_for_completion=False
)

이 Task가 멀티 DAG의 핵심입니다. 각 파라미터를 살펴보면:

  • trigger_dag_id → 실행시킬 대상 DAG의 ID. 여기서는 Transform 단계 DAG를 지정
  • conf → 대상 DAG에 전달할 데이터. Jinja 템플릿 {{ }}을 사용하여 이전 Task(extract)의 xCom 값(파일 경로)을 꺼내서 전달
  • reset_dag_run=True → 같은 날짜에 이미 실행된 DAG Run이 있어도 초기화 후 재실행 허용
  • wait_for_completion=False → 트리거만 하고 대기 없이 바로 종료 (비동기 처리). True로 하면 Transform DAG가 끝날 때까지 이 Task가 대기함

6. Task 의존성 설정

T1_extract >> task_trigger_transform_dag_run

Extract Task가 완료된 후 Transform DAG 트리거가 실행됩니다. 실행 흐름을 정리하면:

  1. _extract 함수 실행 → 센서 더미 데이터 생성 및 JSON 파일 저장
  2. 파일 경로가 xCom에 저장됨
  3. TriggerDagRunOperator가 xCom에서 파일 경로를 꺼내 Transform DAG에 전달하며 트리거
  4. Transform DAG가 해당 파일 경로를 받아 데이터 변환 작업 수행

정리

구성 요소 역할

PythonOperator 파이썬 함수를 Task로 실행
TriggerDagRunOperator 다른 DAG를 트리거
xCom Task 간, DAG 간 데이터 전달
conf + Jinja 템플릿 트리거 시 데이터를 전달하는 방법

멀티 DAG 구조의 장점은 각 ETL 단계를 독립적으로 관리, 모니터링, 재실행할 수 있다는 점입니다. Extract가 실패하면 Extract만 재실행하면 되고, Transform에 문제가 있으면 Transform DAG만 수정하면 됩니다.

'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글

Airflow - 6(멀티 DAG 구성 - Load)  (0) 2026.04.14
Airflow - 5 (멀티 DAG - Transform)  (0) 2026.04.14
Airflow - 3  (1) 2026.04.09
Airflow - 2  (0) 2026.04.09
Airflow - 1  (0) 2026.04.08