Airflow 멀티 DAG 구성 - Transform 단계 코드 분석
멀티 DAG ETL 파이프라인의 두 번째 단계인 Transform(전처리)을 배웠다. Extract DAG에서 트리거를 받아 실행되고, 데이터를 전처리한 뒤 다시 Load DAG를 트리거하는 구조.
전체 구조
Extract DAG가 센서 데이터를 JSON으로 저장하고 이 DAG를 트리거 → 이 DAG에서 JSON을 읽어 전처리 후 CSV로 저장 → Load DAG를 트리거하는 흐름. 이 DAG는 직접 스케줄링되지 않고, 앞 단계 DAG의 트리거로만 실행됨.
1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import logging
import pandas as pd
import os
- pandas → 센서 데이터를 DataFrame으로 로드해서 필터링, 변환 처리할 때 사용
- TriggerDagRunOperator → Transform 완료 후 Load DAG를 트리거하기 위해 사용
- 나머지는 Extract 단계와 동일
2. 데이터 저장 경로 설정
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)
Extract DAG와 동일한 경로 사용. 같은 Docker 볼륨을 공유하고 있어서 Extract에서 저장한 JSON 파일을 여기서 읽을 수 있는 구조.
3. 콜백 함수 (_transform)
3-1. 이전 DAG에서 데이터 받기
def _transform(**kwargs):
dag_run = kwargs['dag_run']
json_file_path = dag_run.conf.get('json_path')
logging.info(f'전달받은 데이터 {json_file_path}')
멀티 DAG의 핵심 포인트. 같은 DAG 안에서는 xcom_pull로 데이터를 주고받지만, 다른 DAG에서 전달한 데이터는 kwargs['dag_run'].conf로 받아야 함. Extract DAG의 TriggerDagRunOperator에서 conf={"json_path": "..."} 형태로 보낸 값을 .get('json_path')로 꺼내는 방식.
3-2. JSON 로드 및 필터링
df = pd.read_json(json_file_path)
target_df = df[df['temperature'] < 100].copy()
- pd.read_json() → JSON 파일을 바로 DataFrame으로 로드
- df['temperature'] < 100 → 불리언 인덱싱으로 섭씨 100도 미만 데이터만 추출. 공장 기준으로 100도 이상은 이상 데이터로 간주하고 제거
- .copy() → 원본 DataFrame에 영향을 주지 않도록 복사본 생성. 이걸 안 하면 뒤에서 값 수정 시 SettingWithCopyWarning 경고 발생 가능
3-3. 파생변수 생성 (섭씨 → 화씨 변환)
target_df['temperature_f'] = (target_df['temperature'] * 9 / 5) + 32
화씨 변환 공식 (섭씨 × 9/5) + 32를 적용해서 temperature_f 컬럼을 새로 생성. pandas의 벡터 연산으로 전체 행이 한 번에 처리됨.
3-4. CSV 저장
file_path = f'{DATA_PATH}/preprocessing_data_{kwargs["ds_nodash"]}.csv'
target_df = target_df.rename(columns={'temperature': 'temperature_c'})
target_df.to_csv(file_path, index=False)
logging.info(f'전처리 후 csv 저장 완료 {file_path}')
return file_path
- rename() → 기존 temperature 컬럼명을 temperature_c로 변경하여 화씨(temperature_f)와 구분
- to_csv(index=False) → 인덱스 번호 제외하고 저장
- return file_path → CSV 경로가 xCom에 저장되어 다음 Task(Load DAG 트리거)에서 사용 가능
4. DAG 정의
with DAG(
dag_id='06_multiDAG_transform',
description='ETL 중 Transform 단계 - 온도 센서 데이터 전처리',
default_args={
'owner': 'de_2team_manager',
'retries': 1,
'retry_delay': timedelta(minutes=1),
},
schedule_interval=None,
start_date=datetime(2026, 2, 25),
catchup=False,
tags=['mysql', 'etl', 'transform']
) as dag:
Extract DAG와 거의 동일하지만 한 가지 차이점 존재:
- schedule_interval=None → 자체 스케줄 없음. Extract DAG의 TriggerDagRunOperator에 의해서만 실행됨. 멀티 DAG 구조에서 중간/후속 단계 DAG는 이렇게 설정하는 것이 일반적.
5. Task 정의
5-1. Transform Task
T2_transform = PythonOperator(
task_id='transform',
python_callable=_transform
)
위에서 정의한 _transform 함수를 실행하는 Task.
5-2. Load DAG 트리거 Task
task_trigger_load_dag_run = TriggerDagRunOperator(
task_id="trigger_load",
trigger_dag_id="06_multiDAG_load",
conf={
"csv_path": "{{task_instance.xcom_pull(task_ids='transform')}}"
},
reset_dag_run=True,
wait_for_completion=False
)
Extract DAG에서 이 DAG를 트리거한 것과 동일한 패턴:
- trigger_dag_id → 이번에는 Load DAG를 트리거
- conf → Jinja 템플릿으로 transform Task의 xCom 값(CSV 경로)을 꺼내서 Load DAG에 전달
- wait_for_completion=False → 트리거만 하고 바로 종료 (비동기)
6. Task 의존성
T2_transform >> task_trigger_load_dag_run
Transform 완료 → Load DAG 트리거 순서로 실행.
정리
구성 요소 역할
| dag_run.conf | 다른 DAG에서 트리거 시 전달한 데이터를 받는 방법 |
| schedule_interval=None | 자체 스케줄 없이 트리거로만 실행 |
| pd.read_json() | JSON → DataFrame 변환 |
| 불리언 인덱싱 | 조건에 맞는 데이터만 필터링 |
| TriggerDagRunOperator | 전처리 완료 후 Load DAG 트리거 |
멀티 DAG에서 데이터를 주고받는 두 가지 방법 정리:
- 같은 DAG 내 → xcom_pull(task_ids='...')
- 다른 DAG 간 → TriggerDagRunOperator의 conf로 전송, kwargs['dag_run'].conf로 수신
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Airflow - 7 (DAG의 외부 API 활용) (0) | 2026.04.14 |
|---|---|
| Airflow - 6(멀티 DAG 구성 - Load) (0) | 2026.04.14 |
| Airflow - 4 (멀티 DAG 구성 - Extract) (0) | 2026.04.14 |
| Airflow - 3 (1) | 2026.04.09 |
| Airflow - 2 (0) | 2026.04.09 |