ETL
airflow 설정 수정 및 재설치
- Airflow 조정
- 시간대 조절
- 대한민국 / 서울 시간대로 조정
- docker-compose.yaml
x-airflow-common: &airflow-common ... environment: &airflow-common-env # 대한민국/서울 시간대로 조정 TZ: 'Asia/Seoul' AIRFLOW__WEB_SERVER__DEFAULT_UI_TIMEZONE: 'Asia/Seoul' - 대한민국 / 서울 시간대로 조정
- docker compose 상에 서비스로 mysql 추가
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} -> _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} apache-airflow-providers-mysql- 적재(Load) 대상으로 활용
- mysql 서비스 추가
- docker-compose.yaml
services: ... # mysql 컨테이너가 삭제 되더라도 데이터는 유지되도록 도커 엔진 내부에 볼륨을 할당 volumes: mysql-db-volume: /var/lib/mysql healthcheck: test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] interval: 10s timeout: 5s retries: 5 restart: always - 컨테이너 새로 구성
- 기존 내용 그대로 복구됨
docker compose down docker compose up -d- airflow-class-mysql 접속 > exec
# 접속 테스트 sh> mysql -u root -p password > .... ... # 접속 성공 mysql>
- 시간대 조절
대시보드(웹서버) 상에 연결 정보 세팅
- 대시보드 새로 고침하면 로그인 창이 뜸, 로그인
- 상단 메뉴바에서 Admin > Connections 진입 > + 버튼 눌러서

05_mysql_etl.py
- DAG 주제
- 1개의 DAG에서 etl 수행 <-> n개의 DAG에서 etl 수행
- Task
- T1 : Extract
- 스마트팩토리 상에 온도 센서에서 온도 측정
- 해당 데이터가 어딘가에(데이터 레이크 ex) AWS S3) 쌓이고 있음 (실시간, 주기적) -> 해당 데이터를 추출하는 Task (더미데이터)
- T2 : Transform
- 추출한 데이터를 가져와서 변환과정(전처리) 수행
- 온도값 보정, 향후 사용 모델/서비스 등을 위해서 단위 변환 등의 인코딩 처리가 있겠음
- 여기서는 단순하게 단위 환산 처리만 진행
- 추출한 데이터를 가져와서 변환과정(전처리) 수행
- T3 : Load
- 변환된 데이터는 mysql의 특정 테이블에 적재 처리함
- 소량의 데이터로 가정하여 RDB에 저장
- 대량이라면 데이터레이크로 저장(S3 등)
- T1 : Extract
'''
- etl 간단하게 적용, 스마트팩토리 상 온도 센서에 대한 ETL 처리, mysql 사용
'''
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
# 추가분
#from airflow.providers.mysql.operators.mysql import MySqlOperator
# 범용 sql 오퍼레이터로 대체
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# Load 처리 시 sql에 전처리된 데이터를 밀어 넣을 때 사용
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 데이터
import json
import random
import pandas as pd
import os
# 2. 기본 설정
# 프로젝트 내부 폴더를 데이터용으로 ~/dags/data 지정
# Task 진행 간 생성되는 파일을 동기화하도록 위치 지정 -> 향후 S3(데이터 레이크)로 대체될 수 있음
# 도커 내부에 생성된 컨테이너의 worker 내의 airflow 상 지정한 데이터 위치
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)
# 4-1. 콜백 함수 정의
def _extract(**kwargs):
# 스마트 패곹리에 설치된 온도 센서에서 데이터가 발생되면 데이터레이크(S3, 어딘가에 존재)에
# 쌓이고 있다고 가정 -> 추출해서 가져오는 단계로 가정
data = [
{
"sensor_id": f"SENSOR_{i+1}", # 장비 ID
# YYYY-MM-DD hh:mm:ss 형태, 데이터가 발생한 시점
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"temperature": round(random.uniform(20.0, 150.0), 2), # 20.0 ~ 150.0 사이의 랜덤한 온도 데이터
"status" : "on", # "off"도 존재할 수 있음, 현재는 "on"으로 고정
} for i in range(10)
]
# 더미 데이터를 파일로 저장 (로그파일처럼) -> json 형태로 저장
# /opt/airflow/dags/data/sensor_data_DAG수행날짜.json
# 실습 -> 위의 데이터를 json으로 저장하는 코드 작성(json.dump(data, f))
file_path = f"{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json"
with open(file_path, "w") as f:
json.dump(data, f)
# 로그는 별도의 프로그램에서 지속적으로 발생시켜야 함(시뮬레이션 기준)
# 현재는 편의 상 airflow에 포함시킴
# xCom을 통해서 T2_transform에게 전달 (로그의 경로를 전달)
logging.info(f'extract 완료한 로그 데이터 {file_path}')
return file_path
def _transform(**kwargs):
# _extract에서 추출된 데이터를 xCom을 통해서 획득
# 이 데이터를 df로 로드 -> 섭씨를 화씨로 일괄 처리(한 번에 n개의 센서에서 데이터가 전달될 때 유리)
# 전처리된 내용은 csv로 덤프(S3로 업로드 고려)
# 1. xCom을 통해서 이전 task에서 전달한 데이터 획득
ti = kwargs['ti']
json_file_path = ti.xcom_pull(task_ids='extract')
# 로그 출력
logging.info(f'전달받은 데이터 {json_file_path}')
# 해당 데이터를 df(pandas 사용, 소량 데이터)로 로드
df = pd.read_json(json_file_path)
# 섭씨를 화씨로 일괄 처리(한 번에 n개의 센서에서 데이터 전달)
# 설정 : 우리 공장에서는 측정온도가 섭씨 100도 미만인 데이터만 정상 데이터로 간주한다.
# 100도 이상 데이터는 이상탐지로 간주한다 -> 일단 버리는 것으로 적용
# 2. 100도 미만 데이터만 추출 (필터링) -> pandas의 불리언 인덱싱 사용
target_df = df[df['temperature'] < 100].copy()
# 3. 파생변수로 화씨 데이터 구성 (temperature_f) = (섭씨 * 9 / 5) + 32
target_df['temperature_f'] = (target_df['temperature']*9/5) + 32
# 4. 전처리된 내용은 csv로 덤프(S3로 업로드 고려)
# 파일명 준비 /opt/airflow/dags/data/preprocessing_data_DAG수행날짜.csv
file_path = f'{DATA_PATH}/preprocessing_data_{ kwargs['ds_nodash']}.csv'
# 저장
target_df.to_csv( file_path, index=False) # 인덱스는 제외
logging.info(f'전처리 후 csv 저장 완료 {file_path}') # airflow가 aws에서 가동되면 s3로 저장
# 5. csv 경로 xCom을 통해서 게시
return file_path
def _load(**kwargs):
# csv -> df -> mysql로 적재하는 단계
# 1. csv 경로 획득
# 2. csv -> df 변환
# 3. mysql 연결 -> MySqlHook 사용
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 획득
# 7. 전체를 try ~except로 감싸기(I/O)
try :
# 4. 커서를 획득하여 insert 구문 사용
with conn.cursor() as cursor:
# 4-1. insert 구문 사용
sql = ""
params = []
cursor.executemany( sql, params )
# 4-2. 커밋
conn.commit()
pass
except Exception as e:
pass
finally:
# 5. 연결 종료
if conn:
conn.close()
# 3. DAG 정의
with DAG(
dag_id = '05_mysql_etl',
description = 'ETL을 수행하여 mysql에 온도 센서 데이터 적재',
default_args = {
'owner': 'de_2team_manager',
'retries': 1,
'retry_delay': timedelta(minutes=1),
},
schedule_interval = '@daily',
start_date = datetime(2026, 2, 25),
catchup = False,
tags = ['mysql', 'etl']
) as dag:
# 4. Task 정의
task_create_table = SQLExecuteQueryOperator(
# 테이블 생성, if not exists 옵션 활용하여 무조건 sql이 일단 수행되게 구성
# -> 이렇게 안 하면 fail 발생함 (2회차부터)
# 최초는 create, 존재하면 pass하도록 설정 -> if not exists 옵션 활용
task_id = 'create_table',
# 연결 정보
conn_id = 'mysql_default', # 대시보드에 사전에 등록해두었음(admin > connections > 하위에)
# sql
sql = '''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INT AUTO_INCREMENT PRIMARY KEY,
sensor_id VARCHAR(50),
timestamp DATETIME,
temperature_c FLOAT,
temperature_f FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
)
T1_extract = PythonOperator(
task_id = 'extract',
python_callable = _extract
)
T2_transform = PythonOperator(
task_id = 'transform',
python_callable = _transform
)
T3_load = PythonOperator(
task_id = 'load',
python_callable = _load
)
# 5. Task 간 의존성 설정
task_create_table >> T1_extract >> T2_transform >> T3_load
이 코드는 Airflow에서 스마트 팩토리 온도 센서 데이터를 추출(Extract) → 변환(Transform) → 적재(Load)하는 ETL 파이프라인을 MySQL과 함께 구현한 DAG이다.
모듈 구성
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
SQLExecuteQueryOperator는 범용 SQL 실행 오퍼레이터로, MySQL 전용 오퍼레이터(MySqlOperator) 대신 사용하고 있다. 어떤 DB든 conn_id만 바꾸면 동작하기 때문에 범용성이 높다. MySqlHook은 Load 단계에서 Python 코드 안에서 직접 MySQL에 연결하여 데이터를 밀어넣을 때 사용한다.
데이터 경로 설정
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)
Docker 컨테이너 내부 Airflow worker의 dags 폴더 하위에 data 디렉토리를 지정했다. Task 간에 생성되는 파일(JSON, CSV)을 이 경로에 저장하여 동기화한다. 실제 운영 환경에서는 S3 같은 데이터 레이크로 대체될 수 있는 부분이다.
Task 흐름
create_table → extract → transform → load
테이블 생성이 가장 먼저 실행되고, 이후 ETL 3단계가 순서대로 진행된다.
task_create_table — 테이블 생성
task_create_table = SQLExecuteQueryOperator(
conn_id = 'mysql_default',
sql = '''CREATE TABLE IF NOT EXISTS sensor_readings (...)'''
)
DAG이 실행될 때마다 매번 수행되는 Task이다. IF NOT EXISTS 옵션 덕분에 테이블이 이미 있으면 무시하고, 없으면 새로 생성한다. 이 옵션이 없으면 2회차 실행부터 "이미 존재한다"는 에러로 Task가 실패한다. conn_id는 Airflow 대시보드의 Admin → Connections에서 미리 등록해둔 MySQL 연결 정보를 참조한다.
T1: Extract (추출)
def _extract(**kwargs):
data = [
{
"sensor_id": f"SENSOR_{i+1}",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"temperature": round(random.uniform(20.0, 150.0), 2),
"status": "on",
} for i in range(10)
]
스마트 팩토리 온도 센서 10개에서 데이터가 발생하는 상황을 시뮬레이션한다. 실제로는 데이터 레이크(S3 등)에 쌓여 있는 로그를 가져오는 단계에 해당한다. 온도는 20.0~150.0 사이의 랜덤 값으로 생성된다.
생성된 데이터는 JSON 파일로 저장하고, return file_path를 통해 파일 경로를 XCom에 자동 push한다. 다음 Task인 Transform에서 이 경로를 꺼내 쓸 수 있다.
T2: Transform (변환)
def _transform(**kwargs):
ti = kwargs['ti']
json_file_path = ti.xcom_pull(task_ids='extract')
먼저 xcom_pull로 Extract에서 저장한 JSON 파일 경로를 가져온다. 이 파일을 pandas DataFrame으로 로드한 뒤 두 가지 전처리를 수행한다.
target_df = df[df['temperature'] < 100].copy()
target_df['temperature_f'] = (target_df['temperature'] * 9 / 5) + 32
첫째, 섭씨 100도 이상인 데이터는 이상치로 간주하고 필터링한다. 공장 기준으로 100도 미만만 정상 데이터로 취급하는 것이다. 둘째, 남은 정상 데이터에 화씨 변환 파생변수(temperature_f)를 추가한다.
전처리가 끝나면 CSV로 저장하고, 마찬가지로 return file_path로 경로를 XCom에 게시한다.
T3: Load (적재)
def _load(**kwargs):
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn()
MySqlHook을 사용해 Python 코드 안에서 직접 MySQL 커넥션을 획득한다. Hook은 Airflow의 Connection 정보를 재사용하므로, 코드에 호스트/비밀번호 등을 하드코딩하지 않아도 된다.
이후 커서를 열고 executemany로 DataFrame의 행들을 INSERT하는 구조이다. 현재 코드에서는 sql과 params가 빈 상태로, 아직 구현이 완료되지 않은 상태이다. 완성하려면 이런 형태가 된다.
sql = """INSERT INTO sensor_readings
(sensor_id, timestamp, temperature_c, temperature_f)
VALUES (%s, %s, %s, %s)"""
params = [tuple(row) for row in df[['sensor_id','timestamp','temperature','temperature_f']].values]
try-except-finally 구조로 감싸서, 에러가 발생해도 반드시 conn.close()로 연결을 종료하도록 하고 있다. DB 연결은 I/O 작업이므로 예외 처리가 필수이다.
XCom 데이터 흐름 정리
Extract에서 JSON 파일 경로를 return하면 XCom에 자동 push된다. Transform에서 xcom_pull(task_ids='extract')로 경로를 가져와 파일을 읽고, 전처리 후 CSV 경로를 다시 return한다. Load에서 xcom_pull(task_ids='transform')으로 CSV 경로를 가져와 MySQL에 적재하는 흐름이다.
정리하면, 이 DAG은 센서 데이터 생성(JSON) → 이상치 필터링 + 단위 변환(CSV) → MySQL 적재라는 전형적인 ETL 흐름을 Airflow로 구현한 것이다. Task 간 데이터 전달은 XCom을 사용하고, DB 연결은 Airflow의 Connection + Hook 체계를 활용하여 코드에 민감 정보를 노출하지 않는 구조이다.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Airflow - 6(멀티 DAG 구성 - Load) (0) | 2026.04.14 |
|---|---|
| Airflow - 5 (멀티 DAG - Transform) (0) | 2026.04.14 |
| Airflow - 4 (멀티 DAG 구성 - Extract) (0) | 2026.04.14 |
| Airflow - 2 (0) | 2026.04.09 |
| Airflow - 1 (0) | 2026.04.08 |