Airflow MSA 신용평가 DAG - 기존 버전 vs 업그레이드 버전 비교 분석
MSA 환경에서 AI API 서버를 호출하여 고객 신용평가를 수행하는 Airflow DAG를 분석해보았다. 기존 버전과 업그레이드 버전의 차이점을 중심으로 코드를 하나하나 뜯어볼 예정이다.
전체 구조 비교
두 버전 모두 동일한 흐름을 가짐:
더미 데이터 준비 → (데이터 추출) → AI API 호출 → 결과 DB 저장
핵심 차이는 데이터를 어디서 가져오고, 어떻게 저장하느냐에 있다.
| 항목 | 기존 버전 | 업그레이드 버전 |
| 데이터 소스 | 파이썬 리스트(하드코딩) | DB에서 SELECT 조회 |
| 더미 데이터 | 고정 3건 | 랜덤 50건, DB에 INSERT |
| Extract Task | 미구현 (pass) | DB 조회로 구현 |
| 결과 저장 방식 | INSERT (신규 삽입) | UPDATE (기존 데이터 갱신) |
| Task 의존성 | 3단계 (extract 스킵) | 4단계 (extract 포함) |
1. 모듈 가져오기
기존 버전
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging, json, requests
from airflow.providers.mysql.hooks.mysql import MySqlHook
업그레이드 버전 (추가분)
import random # 랜덤 더미 데이터 생성용 추가
업그레이드 버전에서 random 모듈이 추가됨. 더미 데이터를 고정값이 아닌 랜덤으로 생성하기 위한 것.
2. 더미 데이터 준비 (_create_dummy_data)
기존 버전
def _create_dummy_data(**kwargs):
users = [
{"user_id":"C001", "income":5000, "loan_amt":2000},
{"user_id":"C002", "income":6000, "loan_amt":4000},
{"user_id":"C003", "income":7000, "loan_amt":100}
]
return users
- 3명의 고객 데이터를 파이썬 리스트로 하드코딩
- xCom으로 다음 Task에 직접 전달
- DB를 거치지 않음
업그레이드 버전
def _create_dummy_data(**kwargs):
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
with mysql_hook.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
user_id VARCHAR(50) PRIMARY KEY,
income INT DEFAULT NULL,
loan_amt INT DEFAULT NULL,
credit_score INT DEFAULT NULL,
grade VARCHAR(10) DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('truncate table customers;')
sql = '''
INSERT INTO customers (user_id, income, loan_amt)
VALUES (%s, %s, %s)
'''
params = [
(
f'C{i:03d}',
random.randint(3000, 10000),
random.randint(1000, 5000),
)
for i in range(1, 51)
]
cursor.executemany(sql, params)
conn.commit()
주요 변경점:
- 50명의 고객 데이터를 랜덤으로 생성하여 DB에 직접 INSERT
- f'C{i:03d}' → C001 ~ C050 형태의 고객 ID 생성. :03d는 3자리 숫자로 포맷팅(1 → 001)
- random.randint(3000, 10000) → 소득 3000~10000 사이 랜덤 값
- TRUNCATE TABLE → 테스트를 여러 번 할 수 있도록 기존 데이터를 전부 삭제 후 재삽입
- credit_score, grade는 INSERT하지 않음 → NULL 상태로 남겨두고 API 평가 후 UPDATE할 예정
- xCom으로 데이터를 전달하지 않음 → 다음 Task에서 DB를 직접 조회하는 방식으로 변경
3. 데이터 추출 (_extract_data)
기존 버전
def _extract_data(**kwargs):
pass
미구현 상태. 의존성에서도 빠져 있어 실제로 실행되지 않음.
업그레이드 버전
def _extract_data(**kwargs):
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
df = mysql_hook.get_pandas_df('''
SELECT user_id, income, loan_amt
FROM customers
WHERE credit_score IS NULL
''')
if df.empty:
logging.info('신규 고객 없다')
return []
return df.to_dict(orient='records')
주요 포인트:
- mysql_hook.get_pandas_df() → SQL 결과를 바로 pandas DataFrame으로 반환하는 편의 메서드. get_conn() → cursor → fetchall() 과정을 한 줄로 대체
- WHERE credit_score IS NULL → 아직 신용평가가 안 된 고객만 조회. 이미 평가된 고객은 제외됨
- df.to_dict(orient='records') → DataFrame을 [{"user_id": "C001", "income": 5000, ...}, ...] 형태로 변환. xCom으로 전달하기 위한 직렬화 가능한 형태
- df.empty 체크 → 신규 고객이 없으면 빈 리스트 반환
기존 버전과의 핵심 차이: xCom으로 하드코딩 데이터를 전달하던 방식에서, DB 조회 결과를 전달하는 방식으로 변경됨.
4. API 호출 (_api_service_call)
기존 버전
users_data = ti.xcom_pull(task_ids='task_create_dummy_data')
업그레이드 버전
users_data = ti.xcom_pull(task_ids='task_extract_data')
유일한 차이: xCom을 가져오는 대상 Task가 task_create_dummy_data에서 task_extract_data로 변경됨. 나머지 API 호출 로직은 동일.
5. 결과 저장 (_load_users_credit)
기존 버전
sql = '''
INSERT INTO customers (user_id, credit_score, grade)
VALUES (%s, %s, %s)
'''
params = [
(data['user_id'], data['credit_score'], data['grade'])
for data in users_grade
]
cursor.executemany(sql, params)
- INSERT 방식 → 신규 행을 삽입
- 문제점: 같은 user_id로 재실행하면 PK 중복 에러 발생
- income, loan_amt은 INSERT하지 않으므로 NULL로 들어감
업그레이드 버전
sql = '''
UPDATE customers
SET credit_score=%s, grade=%s
WHERE user_id=%s
'''
params = [
(data['credit_score'], data['grade'], data['user_id'])
for data in users_grade
]
cursor.executemany(sql, params)
- UPDATE 방식 → 이미 존재하는 고객 데이터에 신용평가 결과만 갱신
- PK 중복 에러 발생하지 않음
- income, loan_amt은 더미 데이터 준비 단계에서 이미 INSERT되어 있으므로 보존됨
- 파라미터 순서 주의: UPDATE에서는 SET 절의 값이 먼저, WHERE 절의 user_id가 마지막
6. Task 의존성
기존 버전
task_create_dummy_data >> task_api_service_call >> task_load_users_credit
3단계. task_extract_data는 정의만 되어 있고 의존성에 포함되지 않음.
업그레이드 버전
task_create_dummy_data >> task_extract_data >> task_api_service_call >> task_load_users_credit
4단계. Extract Task가 의존성에 포함되어 실제로 실행됨. 전체 흐름:
- _create_dummy_data → DB에 50명 고객 데이터 INSERT (신용평가 NULL)
- _extract_data → DB에서 신용평가 미완료 고객 조회
- _api_service_call → AI API 서버에 평가 요청
- _load_users_credit → 평가 결과를 DB에 UPDATE
핵심 차이 정리
항목 기존 버전 업그레이드 버전
| 더미 데이터 | 하드코딩 3건, xCom 전달 | 랜덤 50건, DB INSERT |
| 데이터 흐름 | xCom 기반 (메모리) | DB 기반 (영속 저장소) |
| Extract | 미구현 | DB SELECT + pandas |
| Load SQL | INSERT (신규 삽입) | UPDATE (기존 갱신) |
| PK 중복 대응 | 없음 (에러 발생) | TRUNCATE + UPDATE |
| 의존성 | 3단계 | 4단계 |
업그레이드 버전은 실무 패턴에 가까운 구조이다. 데이터가 DB에 영속적으로 존재하고, 신용평가가 필요한 고객만 조회하여 API를 호출한 뒤, 결과를 기존 레코드에 UPDATE하는 흐름. 기존 버전은 xCom으로 모든 데이터를 전달하는 프로토타입 수준의 구성이었다.
Airflow에서 외부 AI API 서버를 호출하여 신용평가를 수행하고, 결과를 DB에 적재하는 파이프라인을 구성함.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Airflow - 9 (이벤트 기반 파이프라인) (0) | 2026.04.15 |
|---|---|
| Airflow - 8 (S3와 Airflow의 연계) (0) | 2026.04.14 |
| Airflow - 6(멀티 DAG 구성 - Load) (0) | 2026.04.14 |
| Airflow - 5 (멀티 DAG - Transform) (0) | 2026.04.14 |
| Airflow - 4 (멀티 DAG 구성 - Extract) (0) | 2026.04.14 |