SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 6(멀티 DAG 구성 - Load)

dev-lee 2026. 4. 14. 17:12

Airflow 멀티 DAG 구성 - Load 단계 코드 분석

멀티 DAG ETL 파이프라인의 마지막 단계인 Load(적재)를 배움. Transform DAG에서 트리거를 받아 실행되며, 전처리된 CSV 데이터를 MySQL에 적재하는 구조.


전체 구조

Transform DAG가 전처리된 센서 데이터를 CSV로 저장하고 이 DAG를 트리거 → 이 DAG에서 테이블 생성 후 CSV를 읽어 MySQL에 INSERT하는 흐름. Transform DAG와 마찬가지로 자체 스케줄 없이 트리거로만 실행됨.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
import logging
import pandas as pd
import os
  • SQLExecuteQueryOperator → SQL 쿼리를 직접 실행하는 범용 오퍼레이터. 원래 MySqlOperator를 쓸 수도 있지만, 이쪽이 DB 종류에 상관없이 사용 가능한 범용 버전
  • MySqlHook → Python 코드 내에서 MySQL에 직접 연결하여 데이터를 INSERT할 때 사용
  • SQLExecuteQueryOperator는 테이블 생성용, MySqlHook은 데이터 삽입용으로 역할이 나뉨

2. 데이터 저장 경로 설정

DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)

Extract, Transform과 동일한 경로. Docker 볼륨을 공유하고 있어서 Transform에서 저장한 CSV 파일을 읽을 수 있는 구조.


3. 콜백 함수 (_load)

3-1. 이전 DAG에서 데이터 받기

def _load(**kwargs):
    dag_run = kwargs['dag_run']
    csv_path = dag_run.conf.get('csv_path')

Transform DAG의 TriggerDagRunOperator에서 conf={"csv_path": "..."} 형태로 보낸 CSV 경로를 dag_run.conf로 수신. Extract → Transform 때와 동일한 패턴.

3-2. CSV를 DataFrame으로 로드

    df = pd.read_csv(csv_path)

소규모 데이터이므로 pandas 사용이 적절. 참고로 중규모는 Polars, 대규모는 Spark를 사용하는 것이 일반적이며, 대규모 처리는 보통 클라우드 환경에서 수행.

3-3. MySQL 연결 및 데이터 삽입

    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = mysql_hook.get_conn()
    try:
        with conn.cursor() as cursor:
            sql = '''
                INSERT INTO sensor_readings
                (sensor_id, timestamp, temperature_c, temperature_f)
                VALUES (%s, %s, %s, %s)
            '''
            params = [
                (data['sensor_id'], data['timestamp'], data['temperature_c'], data['temperature_f'])
                for _, data in df.iterrows()
            ]
            logging.info(f'입력할 데이터(파라미터) {params}')
            cursor.executemany(sql, params)
            conn.commit()
            logging.info('mysql에 적재 완료')
    except Exception as e:
        logging.info(f'적재 오류 : {e}')
    finally:
        if conn:
            conn.close()
            logging.info('mysql 연결 종료 (뒷정리)')

핵심 부분을 하나씩 뜯어보면:

  • MySqlHook(mysql_conn_id='mysql_default') → Airflow Connections에 등록된 MySQL 연결 정보를 사용하여 Hook 생성
  • conn.cursor() → 커서 획득. with문으로 감싸서 블록 종료 시 자동으로 커서가 닫힘
  • df.iterrows() → DataFrame의 각 행을 순회. _는 인덱스(사용 안 함), data는 해당 행의 데이터
  • executemany(sql, params) → 여러 행을 한 번에 INSERT. execute()를 반복 호출하는 것보다 효율적
  • conn.commit() → INSERT 결과를 DB에 확정. 이걸 안 하면 데이터가 실제로 저장되지 않음
  • try-except-finally → I/O 작업이므로 예외처리 필수. finally에서 연결을 닫아 리소스 누수 방지

참고로 except 블록에서 logging.info()만 하고 있는데, 실무에서는 raise로 예외를 다시 던져야 Airflow가 Task 실패로 인식함. 현재 코드는 에러가 나도 성공으로 처리될 수 있는 구조.


4. DAG 정의

with DAG(
    dag_id='06_multiDAG_load',
    description='ETL 중 Load 단계 - mysql에 온도 센서 데이터 적재',
    default_args={
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval=None,
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['mysql', 'etl', 'load']
) as dag:

Transform DAG와 동일하게 schedule_interval=None으로 설정. Transform DAG의 트리거에 의해서만 실행됨.


5. Task 정의

5-1. 테이블 생성 Task

task_create_table = SQLExecuteQueryOperator(
    task_id='create_table',
    conn_id='mysql_default',
    sql='''
    CREATE TABLE IF NOT EXISTS sensor_readings (
        id INT AUTO_INCREMENT PRIMARY KEY,
        sensor_id VARCHAR(50),
        timestamp DATETIME,
        temperature_c FLOAT,
        temperature_f FLOAT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    '''
)
  • SQLExecuteQueryOperator → SQL을 직접 실행하는 오퍼레이터. PythonOperator 없이 SQL만으로 Task 구성 가능
  • conn_id='mysql_default' → Airflow Admin > Connections에 사전 등록한 MySQL 연결 정보 사용
  • IF NOT EXISTS → 테이블이 이미 존재하면 무시하고 넘어감. 이걸 안 쓰면 2회차 실행부터 "테이블 이미 존재" 에러 발생
  • id AUTO_INCREMENT → 자동 증가 PK
  • created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP → 데이터 삽입 시점이 자동 기록됨

5-2. Load Task

T3_load = PythonOperator(
    task_id='load',
    python_callable=_load
)

위에서 정의한 _load 함수를 실행하는 Task.


6. Task 의존성

task_create_table >> T3_load

테이블 생성 → 데이터 적재 순서로 실행. 테이블이 먼저 존재해야 INSERT가 가능하므로 이 순서가 필수.


정리

구성 요소 역할

SQLExecuteQueryOperator SQL을 직접 실행하는 범용 오퍼레이터 (테이블 생성용)
MySqlHook Python 코드 내에서 MySQL 연결 및 데이터 삽입
executemany() 여러 행을 한 번에 INSERT
IF NOT EXISTS 테이블 중복 생성 에러 방지
dag_run.conf 이전 DAG에서 전달받은 CSV 경로 수신

ETL 멀티 DAG 전체 흐름 정리:

Extract DAG (JSON 생성) → trigger → Transform DAG (전처리, CSV 저장) → trigger → Load DAG (MySQL 적재)

각 DAG가 독립적으로 관리되므로 특정 단계만 재실행하거나 수정하는 것이 용이한 구조.