SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 1

dev-lee 2026. 4. 8. 21:08

AI 직무 관점에서 DE

 

 

 

  • 데이터 엔지니어의 기본 역량, 세부 내용 등등 차주에 진행(참고)
 

개요

 

 

  • 정의
    • 워크플로우 오케스트레이션
    • 지휘자는 직접 작업을 수행하지 않는다. : 지시만 함
    • 실질직인 작업(수집, 추출, 전처리, 적재, ...)은 DAG 파트가 담당
    • airflow는 실질적인(DAG) 일을 명령하는 관리자 역할
    • -> airflow를 깔아야 일을 할 수 있음, DAG가 있어야 작업이 진행됨
 

  • 기존 작업 관리자와의 차이점
    • 자동화 : 윈도우 작업 스케쥴러, 리눅스 cron 등 활용
      • 특정 시간에 특정 업무를 처리하게끔 함
      • 위 내용을 등록하여 처리
    • 단점
      • 앞선 작업의 성공/실패 여부에 따른 분기 처리 등에 대한 유동적인 처리가 불가능하거나 어려움.
  • airflow
    • airflow는 상황 조치가 가능함. : 프로그램 가능
 

  • 도입 이유
    • 의존성 관리
      • A 작업이 성공해야만 B를 실행할 수 있음 (시나리오 구성 가능)
    • 후행 및 재시도
      • 작업 실패 발생 -> 5분 후에 재시도(retry) -> 계속 실패 -> 관리자에게 장애 통보
    • 백필(backfill)
      • 상황
        • 데이터를 수집하는 타겟의 서버(서비스, 장비, 센서 등등) 장애로 데이터가 5일치가 누락된 상황 -> 복구되면 해당 데이터를 현재 시점까지 순서대로 다시 수집(되돌린다), raw 데이터는 존재하고 이 데이터를 가져오는 중간에서 장애가 발생한 것으로 가정
 

  • 4대 핵심 아키텍쳐(구성원)
    1. 스케쥴러
    • 24시간, 365일 계속 가동(깨어있음) -> 서버(서비스, EC2 세팅)
    • 흐름
      • 지금 실행해야할 작업이 존재하는가?를 체크(감시) -> 조건에 맞는 작업이 존재한다면 Executor에게 전송(내부 통신을 통해)
    1. 웹서버
    • 모니터링, 작업 조절 등 관리 페이지 제공
      • 대시보드 제공
      • 관리 모니터링 기능 등 제공
    • URL
      • localhost:8080 : 자바에서 많이 쓰는 포트, 이거 보면 자바로 짰나? 싶은 느낌
    1. 메타데이터 관리 DB
    • 스케쥴 등 데이터베이스에서 관리
      • mysql, postgres등 사용 가능
      • 차후 구성 시 docker-compose에서 구성
    • 인증 기능
      • redis 사용(nosql 계열)
    • 저장 내용
      • 작업의 실행/성공/실패 등..(로그)
      • 특정 DAG가 오늘 20시에 작동 예정이다(스케쥴)
        • DAG 추가 -> 자동으로 스케쥴 저장
      • 각종 모든 상태 정보(Status)
      • 스케쥴러, 웹서버 등 DB를 바라보면서 통신 진행(상호 대화)
    1. executor / worker (실제 작업 관련 내용임)
    • 실제 작업 담당(DAG)
    • LocalExecutor
      • 로컬 머신(PC, 단일 인스턴스 등) -> 프로세스 가동 -> JOB 수행
      • 기본은 여기서 진행(참고)
    • CeleryExecutor / KubernatesExecutor
      • N개의 서버를 가동하여 작업을 분산 : 대용량 처리
 

  • 핵심 구성원(세부적)
    • 대시보드를 통해서 시각적으로 추후 추가적으로 확인(참고)
    • DAG(Directed Acyclic Graph)
      • 방향성을 가진 비순환 그래프
      • 순환(Loop)이 없는 작업 흐름
      • 예시
        • 단독 수행
        • 앞단계 수행 -> 완료 -> 다음 작업 직접 호출
        • 앞단계 수행 -> 완료 -> 산출물을 통해 이벤트 -> 다음 작업 호출
        • ...
        • 무한 루프 금지
          • 일방통행 작업
          • A -> B -> C (O)
          • A -> b -> .. -> A (X)
        • 특정 작업을 수행하는 프로그램 단위
          • *.py
    • Operator
      • DAG 내부에서 특정 작업(큰 목적)을 위해 작은 작업들이 구성될 수 있음
      • 어떤 작업을 할지(사전에 정의된 템플릿)
        • XXXXOperator
        • Ex)
          • BashOperator (리눅스 기반)
          • PythonOperator (파이썬 기반)
          • SQLOperator (DB 기반)
    • Task
      • Operator 수행 시 구분하는 단위(이름)
      • DAG에서는 Node 한 개를 의미
        • 대시보드 상에서 특정 DAG 선택하고 상세보기 등 선택 -> Graph로 시각화 확인 가능
      • DAG는 큰 작업 한 개를 의미하고 그게 모여서 더 큰 목적을 달성할 수 있음
    • Task Instance
      • Task가 실행되면 인스턴스가 할당(객체 할당, 메모리 할당) -> 한 개의 점으로 표현(대시보드) -> 색상으로 수행 여부 판단(GreenLight : 성공, RedLight : 실패)
      • Task + 스케쥴(시간/날짜 기록되어 있을 것임) 결합된 단위
      • 스케쥴에 해당되면 수행, 완료되면 반환되는 구조
      • 대시보드를 통해서 확인
 

  • 주의사항
    • 동일한 코드(DAG 내)는 1회를 수행하든 n번 수행하든 결과가 항상 같아야 한다. -> 멱등성
    • 중복된 데이터가 쌓이지 않도록 주의해야 한다.
 

로컬 PC에서 설치 및 운용

 

  • 개요
    • 도커 기반
    • Docker Compose 사용
      • Official 사이트에서 제공되는 yaml 파일 사용(세팅에 신경을 크게 쓰지 않아도 된다.)
  • 기본 설치 전 구성(준비)
    • step 1
      • 도커 클라이언트 구동
    • step 2
      • vscode 구동
      • C:\Users\Administrator\Desktop\project\Airflow-Local
    • step 3
    • step 4
      • 구성과 운영에 필요한 폴더 생성
        • 내부적으로 기본 이름 설정되어 있음
      mkdir -p ./dags ./logs ./plugins ./config
      
    • step 5
      • .env 구성 (생성)
      AIRFLOW_UID=50000
      
  • 도커 기반 설치
    • step 1
      • 초기화 : 초기화 작업 진행 (초기 DB 세팅, 계정 설정 등등)
        docker compose up airflow-init
      
    • step 2
      • 컨테이너 모두 구성
        docker compose up -d
      
  • 대시보드 접속
    • localhost:8080
    • ID/PW : airflow
 

DAG 기본형 구성

  • 간단한 기능부터 복잡한 기능(요구사항이 복잡한 구조)까지 순차적으로 진행
  • AI 데이터, AWS 서비스 연동 등 다양하게 확장
 

  • 프로젝트 내에서 DAG는 ./dags 하위에 *.py 단위 파일로 구성
    • 잠시 후 자동으로 인식하게 되어있음
      ~/Airflow-Local
      L dags
        L 01_basics_bash.py
        L 01_basics_python.py
    
 

01_basics_bash.py

 

  • DAG 등록
    • DAG로서 역할(ID값 등)이 구성되면 자동으로 등록된다.
    • DAG 구성 중 문제가 발생하면 자동으로 오류가 출력됨.
      • 새로고침 혹은 잠시 대기하면 처리됨.
  • DAG 구동
    • 0단계
      • 필요한 DAG 찾기
      • 태그 검색, 기본 검색 등등 찾기
    • 1단계
      • 해당 DAG 진입 (클릭을 통해 진입 가능)
      • ON 상태 구성 (활성화)
        • pause/unpause 버튼
        • 특정 시간이 되고 조건이 맞으면 작동함
    • 2단계
      • 즉시 실행 -> 재생 버튼(triger button)클릭 시 즉시 작동됨.
    • 3단계
      • 모니터링 -화면 왼쪽에서 특정 수행 단계를 클릭하면
        • Task 별, 전체 등등 확인 가능
        • Task 별로 ID로 나열되어 있음, 정사각형 박스 1개가 Task Instance임
      • Task Instance
        • 박스별 클릭 > logs 클릭 : 개별 로그 상세하게 확인 가능(VScode에서 logs 하위파일에서도 확인 가능함)
        • 박스 클릭하면 logs, xCom, task duration 메뉴 추가됨
  • 로그
# t1 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=date-print/attempt=1.log
[2026-04-08, 14:35:13 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:13 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:13 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'date']
[2026-04-08, 14:35:13 KST] {subprocess.py:86} INFO - Output:
[2026-04-08, 14:35:13 KST] {subprocess.py:93} INFO - Wed Apr  8 05:35:13 UTC 2026
-> 시간 출력
[2026-04-08, 14:35:13 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:13 KST] {taskinstance.py:340} ▶ Post task execution logs

# t2 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=sleep/attempt=1.log
[2026-04-08, 14:35:15 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:16 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:16 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'sleep 5']
[2026-04-08, 14:35:16 KST] {subprocess.py:86} INFO - Output:
-> 5초 지연
[2026-04-08, 14:35:21 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:21 KST] {taskinstance.py:340} ▶ Post task execution logs

# t3 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=echo-print/attempt=1.log
[2026-04-08, 14:35:23 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:23 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:23 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "Hello, Airflow!"']
[2026-04-08, 14:35:23 KST] {subprocess.py:86} INFO - Output:
[2026-04-08, 14:35:23 KST] {subprocess.py:93} INFO - Hello, Airflow!
-> 메시지 출력
[2026-04-08, 14:35:23 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:23 KST] {taskinstance.py:340} ▶ Post task execution logs

 

  • 로그를 통해 어떤 식으로 돌아가는지 관찰할 수 있었다.
  • subprocess : 이런 코드는 짠 적이 없는데 서브프로세스로 관리하게끔 airflow가 내부적으로 작동하고 있음

 


  • docker 내의 컨테이너 중 호스트 PC와 연동된 경로 확인
    • 호스트PC(~/airflow-local) <-> airflow-worker-1(opt/airflow)

 

  • 스케쥴은 cron으로 표기 가능함
    • 분 단위에서 주 단위까지 5자리로 표기

 

 

02_basics_python.py

  • 파이썬 기능/함수 등등 Task의 주된 내용으로 구성
  • Task와 Task 간 통신 담당 xCom(Cross Communication) 사용, 확인
    • 앞단계 Task의 결과물을 다음 단계 Task에서 사용 (의존성 확인)
    • xCom : 내부 게시판으로 이해하면 됨. -> 특정 데이터가 게시되면 다른 Task가 접근하여 사용한다. -> Context 개념
  • 기본 작동
    • DAG의 dag_id, task 정의, task_id, 의존성만 구성되면 작동됨. -> 껍데기는 이정도만 만들어도 틀은 만들 수 있다.
  • 로그
# t1 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=02_basics_python/run_id=manual__2026-04-08T06:58:09.111626+00:00/task_id=extract_task_data/attempt=1.log
[2026-04-08, 15:58:11 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 15:58:12 KST] {02_basics_python.py:34} INFO - 데이터 추출(Extract 작업) 중...
[2026-04-08, 15:58:12 KST] {02_basics_python.py:35} INFO - 작업시간 2026-04-08, 실행ID manual__2026-04-08T06:58:09.111626+00:00
[2026-04-08, 15:58:12 KST] {02_basics_python.py:36} INFO - 데이터 추출(Extract 작업) 완료!
-> 출력 정상적으로 되었음
[2026-04-08, 15:58:12 KST] {python.py:240} INFO - Done. Returned value was: Data Extract 성공
[2026-04-08, 15:58:12 KST] {taskinstance.py:340} ▶ Post task execution logs

# t2 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=02_basics_python/run_id=manual__2026-04-08T06:58:09.111626+00:00/task_id=transform_task_data/attempt=1.log
[2026-04-08, 15:58:15 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 15:58:15 KST] {02_basics_python.py:60} INFO - 데이터 변환(Transform 작업) 중...
[2026-04-08, 15:58:15 KST] {02_basics_python.py:61} INFO - 결과 : Data Extract 성공
[2026-04-08, 15:58:15 KST] {02_basics_python.py:62} INFO - 데이터 변환(Transform 작업) 완료!
-> 출력 정상적으로 되었음
[2026-04-08, 15:58:15 KST] {python.py:240} INFO - Done. Returned value was: None
[2026-04-08, 15:58:15 KST] {taskinstance.py:340} ▶ Post task execution logs