SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 3

dev-lee 2026. 4. 9. 17:31

ETL

 

airflow 설정 수정 및 재설치

  • Airflow 조정
    • 시간대 조절
      • 대한민국 / 서울 시간대로 조정
        • docker-compose.yaml
       x-airflow-common:
         &airflow-common
      
         ...
      
         environment:
            &airflow-common-env
            # 대한민국/서울 시간대로 조정
            TZ: 'Asia/Seoul'
            AIRFLOW__WEB_SERVER__DEFAULT_UI_TIMEZONE: 'Asia/Seoul'
      
    • docker compose 상에 서비스로 mysql 추가
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
      ->
      _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} apache-airflow-providers-mysql
    
    • 적재(Load) 대상으로 활용
    • mysql 서비스 추가
      • docker-compose.yaml
        services:
          ...
      
          # mysql 컨테이너가 삭제 되더라도 데이터는 유지되도록 도커 엔진 내부에 볼륨을 할당
          volumes:
            mysql-db-volume: /var/lib/mysql
          healthcheck:
            test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
            interval: 10s
            timeout: 5s
            retries: 5
          restart: always 
      
    • 컨테이너 새로 구성
      • 기존 내용 그대로 복구됨
      docker compose down
      docker compose up -d
      
      • airflow-class-mysql 접속 > exec
        # 접속 테스트
        sh> mysql -u root -p
        password > ....
      
        ...
      
        # 접속 성공
        mysql>
      

 

대시보드(웹서버) 상에 연결 정보 세팅

  • 대시보드 새로 고침하면 로그인 창이 뜸, 로그인
    • 상단 메뉴바에서 Admin > Connections 진입 > + 버튼 눌러서

해당 창이 뜨면 다음과 같이 세팅하면 됨

 

05_mysql_etl.py

  • DAG 주제
    • 1개의 DAG에서 etl 수행 <-> n개의 DAG에서 etl 수행
    • Task
      • T1 : Extract
        • 스마트팩토리 상에 온도 센서에서 온도 측정
        • 해당 데이터가 어딘가에(데이터 레이크 ex) AWS S3) 쌓이고 있음 (실시간, 주기적) -> 해당 데이터를 추출하는 Task (더미데이터)
      • T2 : Transform
        • 추출한 데이터를 가져와서 변환과정(전처리) 수행
          • 온도값 보정, 향후 사용 모델/서비스 등을 위해서 단위 변환 등의 인코딩 처리가 있겠음
          • 여기서는 단순하게 단위 환산 처리만 진행
      • T3 : Load
        • 변환된 데이터는 mysql의 특정 테이블에 적재 처리함
        • 소량의 데이터로 가정하여 RDB에 저장
        • 대량이라면 데이터레이크로 저장(S3 등)

 

'''
    - etl 간단하게 적용, 스마트팩토리 상 온도 센서에 대한 ETL 처리, mysql 사용
'''


# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging

# 추가분
#from airflow.providers.mysql.operators.mysql import MySqlOperator
# 범용 sql 오퍼레이터로 대체
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# Load 처리 시 sql에 전처리된 데이터를 밀어 넣을 때 사용
from airflow.providers.mysql.hooks.mysql import MySqlHook

# 데이터
import json
import random
import pandas as pd
import os

# 2. 기본 설정
# 프로젝트 내부 폴더를 데이터용으로 ~/dags/data 지정
# Task 진행 간 생성되는 파일을 동기화하도록 위치 지정 -> 향후 S3(데이터 레이크)로 대체될 수 있음

# 도커 내부에 생성된 컨테이너의 worker 내의 airflow 상 지정한 데이터 위치
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)





# 4-1. 콜백 함수 정의
def _extract(**kwargs):
    # 스마트 패곹리에 설치된 온도 센서에서 데이터가 발생되면 데이터레이크(S3, 어딘가에 존재)에
    # 쌓이고 있다고 가정 -> 추출해서 가져오는 단계로 가정
    data = [
        {
            "sensor_id": f"SENSOR_{i+1}", # 장비 ID
            # YYYY-MM-DD hh:mm:ss 형태, 데이터가 발생한 시점
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "temperature": round(random.uniform(20.0, 150.0), 2), # 20.0 ~ 150.0 사이의 랜덤한 온도 데이터
            "status" : "on", # "off"도 존재할 수 있음, 현재는 "on"으로 고정
         } for i in range(10) 
    ]
    # 더미 데이터를 파일로 저장 (로그파일처럼) -> json 형태로 저장
    # /opt/airflow/dags/data/sensor_data_DAG수행날짜.json
    # 실습 -> 위의 데이터를 json으로 저장하는 코드 작성(json.dump(data, f))
    file_path = f"{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json"
    with open(file_path, "w") as f:
        json.dump(data, f)
    # 로그는 별도의 프로그램에서 지속적으로 발생시켜야 함(시뮬레이션 기준)
    # 현재는 편의 상 airflow에 포함시킴

    # xCom을 통해서 T2_transform에게 전달 (로그의 경로를 전달)
    logging.info(f'extract 완료한 로그 데이터 {file_path}')
    return file_path


def _transform(**kwargs):
    # _extract에서 추출된 데이터를 xCom을 통해서 획득
    # 이 데이터를 df로 로드 -> 섭씨를 화씨로 일괄 처리(한 번에 n개의 센서에서 데이터가 전달될 때 유리)
    # 전처리된 내용은 csv로 덤프(S3로 업로드 고려)
    # 1. xCom을 통해서 이전 task에서 전달한 데이터 획득
    ti = kwargs['ti']
    json_file_path = ti.xcom_pull(task_ids='extract')

    # 로그 출력
    logging.info(f'전달받은 데이터 {json_file_path}')

    # 해당 데이터를 df(pandas 사용, 소량 데이터)로 로드
    df = pd.read_json(json_file_path)

    # 섭씨를 화씨로 일괄 처리(한 번에 n개의 센서에서 데이터 전달)
    # 설정 : 우리 공장에서는 측정온도가 섭씨 100도 미만인 데이터만 정상 데이터로 간주한다.
    #        100도 이상 데이터는 이상탐지로 간주한다 -> 일단 버리는 것으로 적용

    # 2. 100도 미만 데이터만 추출 (필터링) -> pandas의 불리언 인덱싱 사용
    target_df = df[df['temperature'] < 100].copy()

    # 3. 파생변수로 화씨 데이터 구성 (temperature_f) = (섭씨 * 9 / 5) + 32
    target_df['temperature_f'] = (target_df['temperature']*9/5) + 32

    # 4. 전처리된 내용은 csv로 덤프(S3로 업로드 고려)
    # 파일명 준비 /opt/airflow/dags/data/preprocessing_data_DAG수행날짜.csv
    file_path = f'{DATA_PATH}/preprocessing_data_{ kwargs['ds_nodash']}.csv'

    # 저장
    target_df.to_csv( file_path, index=False) # 인덱스는 제외
    logging.info(f'전처리 후 csv 저장 완료 {file_path}') # airflow가 aws에서 가동되면 s3로 저장

    # 5. csv 경로 xCom을 통해서 게시
    return file_path

def _load(**kwargs):
    # csv -> df -> mysql로 적재하는 단계
    # 1. csv 경로 획득

    # 2. csv -> df 변환

    # 3. mysql 연결 -> MySqlHook 사용
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = mysql_hook.get_conn() # 커넥션 획득
    # 7. 전체를 try ~except로 감싸기(I/O)
    try :
        # 4. 커서를 획득하여 insert 구문 사용
        with conn.cursor() as cursor:
            # 4-1. insert 구문 사용
            sql = ""
            params = []
            cursor.executemany( sql, params )
            # 4-2. 커밋
            conn.commit()
            pass
        
    except Exception as e:
        pass
    finally:
        # 5. 연결 종료
        if conn:
            conn.close()
# 3. DAG 정의
with DAG(
    dag_id = '05_mysql_etl',
    description = 'ETL을 수행하여 mysql에 온도 센서 데이터 적재',
    default_args = {
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval = '@daily',
    start_date = datetime(2026, 2, 25),
    catchup = False,
    tags = ['mysql', 'etl']
) as dag:
    # 4. Task 정의
    task_create_table = SQLExecuteQueryOperator(
        # 테이블 생성, if not exists 옵션 활용하여 무조건 sql이 일단 수행되게 구성
        # -> 이렇게 안 하면 fail 발생함 (2회차부터)
        # 최초는 create, 존재하면 pass하도록 설정 -> if not exists 옵션 활용
        task_id = 'create_table',
        # 연결 정보
        conn_id = 'mysql_default', # 대시보드에 사전에 등록해두었음(admin > connections > 하위에)
        # sql
        sql = '''
        CREATE TABLE IF NOT EXISTS sensor_readings (
            id INT AUTO_INCREMENT PRIMARY KEY,
            sensor_id VARCHAR(50),
            timestamp DATETIME,
            temperature_c FLOAT,
            temperature_f FLOAT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        '''
    )
    T1_extract = PythonOperator(
        task_id = 'extract',
        python_callable = _extract
    )
    T2_transform = PythonOperator(
        task_id = 'transform',
        python_callable = _transform
    )
    T3_load = PythonOperator(
        task_id = 'load',
        python_callable = _load
    )
    
    # 5. Task 간 의존성 설정
    task_create_table >> T1_extract >> T2_transform >> T3_load

 

이 코드는 Airflow에서 스마트 팩토리 온도 센서 데이터를 추출(Extract) → 변환(Transform) → 적재(Load)하는 ETL 파이프라인을 MySQL과 함께 구현한 DAG이다.

 

 

모듈 구성

python
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook

SQLExecuteQueryOperator는 범용 SQL 실행 오퍼레이터로, MySQL 전용 오퍼레이터(MySqlOperator) 대신 사용하고 있다. 어떤 DB든 conn_id만 바꾸면 동작하기 때문에 범용성이 높다. MySqlHook은 Load 단계에서 Python 코드 안에서 직접 MySQL에 연결하여 데이터를 밀어넣을 때 사용한다.

 

 

데이터 경로 설정

python
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)

Docker 컨테이너 내부 Airflow worker의 dags 폴더 하위에 data 디렉토리를 지정했다. Task 간에 생성되는 파일(JSON, CSV)을 이 경로에 저장하여 동기화한다. 실제 운영 환경에서는 S3 같은 데이터 레이크로 대체될 수 있는 부분이다.

 

 

Task 흐름

create_table → extract → transform → load

테이블 생성이 가장 먼저 실행되고, 이후 ETL 3단계가 순서대로 진행된다.

 

 

task_create_table — 테이블 생성

python
task_create_table = SQLExecuteQueryOperator(
    conn_id = 'mysql_default',
    sql = '''CREATE TABLE IF NOT EXISTS sensor_readings (...)'''
)

DAG이 실행될 때마다 매번 수행되는 Task이다. IF NOT EXISTS 옵션 덕분에 테이블이 이미 있으면 무시하고, 없으면 새로 생성한다. 이 옵션이 없으면 2회차 실행부터 "이미 존재한다"는 에러로 Task가 실패한다. conn_id는 Airflow 대시보드의 Admin → Connections에서 미리 등록해둔 MySQL 연결 정보를 참조한다.

 

 

T1: Extract (추출)

python
def _extract(**kwargs):
    data = [
        {
            "sensor_id": f"SENSOR_{i+1}",
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "temperature": round(random.uniform(20.0, 150.0), 2),
            "status": "on",
        } for i in range(10)
    ]

스마트 팩토리 온도 센서 10개에서 데이터가 발생하는 상황을 시뮬레이션한다. 실제로는 데이터 레이크(S3 등)에 쌓여 있는 로그를 가져오는 단계에 해당한다. 온도는 20.0~150.0 사이의 랜덤 값으로 생성된다.

생성된 데이터는 JSON 파일로 저장하고, return file_path를 통해 파일 경로를 XCom에 자동 push한다. 다음 Task인 Transform에서 이 경로를 꺼내 쓸 수 있다.

 

 

T2: Transform (변환)

python
def _transform(**kwargs):
    ti = kwargs['ti']
    json_file_path = ti.xcom_pull(task_ids='extract')

먼저 xcom_pull로 Extract에서 저장한 JSON 파일 경로를 가져온다. 이 파일을 pandas DataFrame으로 로드한 뒤 두 가지 전처리를 수행한다.

 
 
python
target_df = df[df['temperature'] < 100].copy()
target_df['temperature_f'] = (target_df['temperature'] * 9 / 5) + 32

첫째, 섭씨 100도 이상인 데이터는 이상치로 간주하고 필터링한다. 공장 기준으로 100도 미만만 정상 데이터로 취급하는 것이다. 둘째, 남은 정상 데이터에 화씨 변환 파생변수(temperature_f)를 추가한다.

전처리가 끝나면 CSV로 저장하고, 마찬가지로 return file_path로 경로를 XCom에 게시한다.

 

 

T3: Load (적재)

python
def _load(**kwargs):
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = mysql_hook.get_conn()

MySqlHook을 사용해 Python 코드 안에서 직접 MySQL 커넥션을 획득한다. Hook은 Airflow의 Connection 정보를 재사용하므로, 코드에 호스트/비밀번호 등을 하드코딩하지 않아도 된다.

이후 커서를 열고 executemany로 DataFrame의 행들을 INSERT하는 구조이다. 현재 코드에서는 sql과 params가 빈 상태로, 아직 구현이 완료되지 않은 상태이다. 완성하려면 이런 형태가 된다.

 
 
python
sql = """INSERT INTO sensor_readings 
         (sensor_id, timestamp, temperature_c, temperature_f)
         VALUES (%s, %s, %s, %s)"""
params = [tuple(row) for row in df[['sensor_id','timestamp','temperature','temperature_f']].values]

try-except-finally 구조로 감싸서, 에러가 발생해도 반드시 conn.close()로 연결을 종료하도록 하고 있다. DB 연결은 I/O 작업이므로 예외 처리가 필수이다.

 

 

XCom 데이터 흐름 정리

Extract에서 JSON 파일 경로를 return하면 XCom에 자동 push된다. Transform에서 xcom_pull(task_ids='extract')로 경로를 가져와 파일을 읽고, 전처리 후 CSV 경로를 다시 return한다. Load에서 xcom_pull(task_ids='transform')으로 CSV 경로를 가져와 MySQL에 적재하는 흐름이다.

 

 

정리하면, 이 DAG은 센서 데이터 생성(JSON) → 이상치 필터링 + 단위 변환(CSV) → MySQL 적재라는 전형적인 ETL 흐름을 Airflow로 구현한 것이다. Task 간 데이터 전달은 XCom을 사용하고, DB 연결은 Airflow의 Connection + Hook 체계를 활용하여 코드에 민감 정보를 노출하지 않는 구조이다.