2026/04/14 5

Airflow - 8 (S3와 Airflow의 연계)

Airflow - AWS S3 연동 기초 (파일 업로드 DAG)Airflow에서 AWS S3에 파일을 업로드하고, 업로드 결과를 확인하는 기본적인 DAG를 분석해보았다. AWS 액세스 키가 정상 작동하는지 체크하고, 로컬 파일을 S3에 적재하는 흐름을 다루었다.전체 구조파일 생성 (Bash) → S3 업로드 → 업로드 확인BashOperator로 로컬에 텍스트 파일을 생성하고, LocalFilesystemToS3Operator로 S3에 업로드한 뒤, S3Hook으로 실제 업로드 여부를 검증하는 흐름.1. 모듈 가져오기from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import Python..

Airflow - 7 (DAG의 외부 API 활용)

Airflow MSA 신용평가 DAG - 기존 버전 vs 업그레이드 버전 비교 분석MSA 환경에서 AI API 서버를 호출하여 고객 신용평가를 수행하는 Airflow DAG를 분석해보았다. 기존 버전과 업그레이드 버전의 차이점을 중심으로 코드를 하나하나 뜯어볼 예정이다.전체 구조 비교두 버전 모두 동일한 흐름을 가짐:더미 데이터 준비 → (데이터 추출) → AI API 호출 → 결과 DB 저장핵심 차이는 데이터를 어디서 가져오고, 어떻게 저장하느냐에 있다. 항목기존 버전업그레이드 버전데이터 소스파이썬 리스트(하드코딩)DB에서 SELECT 조회더미 데이터고정 3건랜덤 50건, DB에 INSERTExtract Task미구현 (pass)DB 조회로 구현결과 저장 방식INSERT (신규 삽입)UPDATE (기존..

Airflow - 6(멀티 DAG 구성 - Load)

Airflow 멀티 DAG 구성 - Load 단계 코드 분석멀티 DAG ETL 파이프라인의 마지막 단계인 Load(적재)를 배움. Transform DAG에서 트리거를 받아 실행되며, 전처리된 CSV 데이터를 MySQL에 적재하는 구조.전체 구조Transform DAG가 전처리된 센서 데이터를 CSV로 저장하고 이 DAG를 트리거 → 이 DAG에서 테이블 생성 후 CSV를 읽어 MySQL에 INSERT하는 흐름. Transform DAG와 마찬가지로 자체 스케줄 없이 트리거로만 실행됨.1. 모듈 가져오기from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperator..

Airflow - 5 (멀티 DAG - Transform)

Airflow 멀티 DAG 구성 - Transform 단계 코드 분석멀티 DAG ETL 파이프라인의 두 번째 단계인 Transform(전처리)을 배웠다. Extract DAG에서 트리거를 받아 실행되고, 데이터를 전처리한 뒤 다시 Load DAG를 트리거하는 구조.전체 구조Extract DAG가 센서 데이터를 JSON으로 저장하고 이 DAG를 트리거 → 이 DAG에서 JSON을 읽어 전처리 후 CSV로 저장 → Load DAG를 트리거하는 흐름. 이 DAG는 직접 스케줄링되지 않고, 앞 단계 DAG의 트리거로만 실행됨.1. 모듈 가져오기from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python im..

Airflow - 4 (멀티 DAG 구성 - Extract)

Airflow 멀티 DAG 구성 - Extract 단계 코드 분석이번 글에서는 Airflow에서 하나의 DAG가 다른 DAG를 트리거하는 멀티 DAG 구조를 살펴봅니다. ETL 파이프라인 중 Extract(추출) 단계를 담당하는 DAG 코드를 하나하나 뜯어보겠습니다.전체 구조이 코드의 핵심은 데이터를 추출한 뒤, 다음 단계인 Transform DAG를 자동으로 실행시키는 것입니다. ETL을 하나의 DAG에 넣을 수도 있지만, 규모가 커지면 단계별로 DAG를 분리하는 것이 관리와 재사용 측면에서 유리합니다. DAG 간 연결에는 TriggerDagRunOperator를 사용합니다.1. 모듈 가져오기from datetime import datetime, timedeltafrom airflow import DA..