SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 5 (멀티 DAG - Transform)

dev-lee 2026. 4. 14. 17:08

Airflow 멀티 DAG 구성 - Transform 단계 코드 분석

멀티 DAG ETL 파이프라인의 두 번째 단계인 Transform(전처리)을 배웠다. Extract DAG에서 트리거를 받아 실행되고, 데이터를 전처리한 뒤 다시 Load DAG를 트리거하는 구조.


전체 구조

Extract DAG가 센서 데이터를 JSON으로 저장하고 이 DAG를 트리거 → 이 DAG에서 JSON을 읽어 전처리 후 CSV로 저장 → Load DAG를 트리거하는 흐름. 이 DAG는 직접 스케줄링되지 않고, 앞 단계 DAG의 트리거로만 실행됨.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import logging
import pandas as pd
import os
  • pandas → 센서 데이터를 DataFrame으로 로드해서 필터링, 변환 처리할 때 사용
  • TriggerDagRunOperator → Transform 완료 후 Load DAG를 트리거하기 위해 사용
  • 나머지는 Extract 단계와 동일

2. 데이터 저장 경로 설정

DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)

Extract DAG와 동일한 경로 사용. 같은 Docker 볼륨을 공유하고 있어서 Extract에서 저장한 JSON 파일을 여기서 읽을 수 있는 구조.


3. 콜백 함수 (_transform)

3-1. 이전 DAG에서 데이터 받기

def _transform(**kwargs):
    dag_run = kwargs['dag_run']
    json_file_path = dag_run.conf.get('json_path')
    logging.info(f'전달받은 데이터 {json_file_path}')

멀티 DAG의 핵심 포인트. 같은 DAG 안에서는 xcom_pull로 데이터를 주고받지만, 다른 DAG에서 전달한 데이터는 kwargs['dag_run'].conf로 받아야 함. Extract DAG의 TriggerDagRunOperator에서 conf={"json_path": "..."} 형태로 보낸 값을 .get('json_path')로 꺼내는 방식.

3-2. JSON 로드 및 필터링

    df = pd.read_json(json_file_path)
    target_df = df[df['temperature'] < 100].copy()
  • pd.read_json() → JSON 파일을 바로 DataFrame으로 로드
  • df['temperature'] < 100 → 불리언 인덱싱으로 섭씨 100도 미만 데이터만 추출. 공장 기준으로 100도 이상은 이상 데이터로 간주하고 제거
  • .copy() → 원본 DataFrame에 영향을 주지 않도록 복사본 생성. 이걸 안 하면 뒤에서 값 수정 시 SettingWithCopyWarning 경고 발생 가능

3-3. 파생변수 생성 (섭씨 → 화씨 변환)

    target_df['temperature_f'] = (target_df['temperature'] * 9 / 5) + 32

화씨 변환 공식 (섭씨 × 9/5) + 32를 적용해서 temperature_f 컬럼을 새로 생성. pandas의 벡터 연산으로 전체 행이 한 번에 처리됨.

3-4. CSV 저장

    file_path = f'{DATA_PATH}/preprocessing_data_{kwargs["ds_nodash"]}.csv'
    target_df = target_df.rename(columns={'temperature': 'temperature_c'})
    target_df.to_csv(file_path, index=False)
    logging.info(f'전처리 후 csv 저장 완료 {file_path}')
    return file_path
  • rename() → 기존 temperature 컬럼명을 temperature_c로 변경하여 화씨(temperature_f)와 구분
  • to_csv(index=False) → 인덱스 번호 제외하고 저장
  • return file_path → CSV 경로가 xCom에 저장되어 다음 Task(Load DAG 트리거)에서 사용 가능

4. DAG 정의

with DAG(
    dag_id='06_multiDAG_transform',
    description='ETL 중 Transform 단계 - 온도 센서 데이터 전처리',
    default_args={
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval=None,
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['mysql', 'etl', 'transform']
) as dag:

Extract DAG와 거의 동일하지만 한 가지 차이점 존재:

  • schedule_interval=None → 자체 스케줄 없음. Extract DAG의 TriggerDagRunOperator에 의해서만 실행됨. 멀티 DAG 구조에서 중간/후속 단계 DAG는 이렇게 설정하는 것이 일반적.

5. Task 정의

5-1. Transform Task

T2_transform = PythonOperator(
    task_id='transform',
    python_callable=_transform
)

위에서 정의한 _transform 함수를 실행하는 Task.

5-2. Load DAG 트리거 Task

task_trigger_load_dag_run = TriggerDagRunOperator(
    task_id="trigger_load",
    trigger_dag_id="06_multiDAG_load",
    conf={
        "csv_path": "{{task_instance.xcom_pull(task_ids='transform')}}"
    },
    reset_dag_run=True,
    wait_for_completion=False
)

Extract DAG에서 이 DAG를 트리거한 것과 동일한 패턴:

  • trigger_dag_id → 이번에는 Load DAG를 트리거
  • conf → Jinja 템플릿으로 transform Task의 xCom 값(CSV 경로)을 꺼내서 Load DAG에 전달
  • wait_for_completion=False → 트리거만 하고 바로 종료 (비동기)

6. Task 의존성

T2_transform >> task_trigger_load_dag_run

Transform 완료 → Load DAG 트리거 순서로 실행.


정리

구성 요소 역할

dag_run.conf 다른 DAG에서 트리거 시 전달한 데이터를 받는 방법
schedule_interval=None 자체 스케줄 없이 트리거로만 실행
pd.read_json() JSON → DataFrame 변환
불리언 인덱싱 조건에 맞는 데이터만 필터링
TriggerDagRunOperator 전처리 완료 후 Load DAG 트리거

멀티 DAG에서 데이터를 주고받는 두 가지 방법 정리:

  • 같은 DAG 내 → xcom_pull(task_ids='...')
  • 다른 DAG 간 → TriggerDagRunOperator의 conf로 전송, kwargs['dag_run'].conf로 수신