SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 2

dev-lee 2026. 4. 9. 16:57
 

03_basics_context_jinja.py

 

  • 목적
    • airflow 내부에서 관리하는 context 정보 접근
      • airflow.macro를 이요하여 context 내 주요 시간정보, 시간 차, 랜덤 등등 활용 가능 + jinja 결합
    • 접근 시 jinja 템플릿 활용
      • jinja 템플릿 :
        • https://jinja.palletsprojects.com/en/stable/
          • 표현
            • {{ }}, {% %}, <-> 탭, 들여쓰기
        • 파이썬 기반 웹프로그램(flask, django, fastapi)에서 SSR(서버 사이드 랜더링) 처리 시 사용하는 엔진
        • 코랩, jupyter, notebook 등 flask로 구성했고 화면은 jinja 템플릿으로 만들었음.(CSR, SSR 복합적임)
        • AI에서 프롬프트 엔지니어링 -> 컨텍스트 엔지니어링 -> 하네스 엔지니어링, ...
        • 위와 같은 작업 시 템플릿 엔진으로 활용
'''
    - macro + jinja 활용하여 airflow의 내부 정보 접근 및 출력 등
    - .
'''

# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import logging

# 3-1. 콜백 함수 정의
def _print(**kwargs):
    logging.info(f'ds 출력 { kwargs["ds"] }')
    logging.info(f'ds_nodash 출력 { kwargs["ds_nodash"] }')
# 2. DAG 정의
with DAG(
    dag_id = '03_basics_context_jinja',
    description = 'macro를 통해 context 접근, jinja를 통해 표현',
    default_args = {
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval = '0 9 * * *', # 분, 시, 일, 월, 요일 -> 매일 오전 09시 00분에 스케쥴 작동
    start_date = datetime(2026, 2, 25),
    catchup = False,
    tags = ['jinja', 'macro', 'context']
) as dag:
    # 3. Task 정의 (operator를 활용)
    t1 = BashOperator(
        task_id = 'jinja_used_bash',
        # jinja에서 값 출력 표현식 -> {{ 변수|값|식|함수 등등 }}
        # {{ context의 키 값 }}
        bash_command = "echo ' DAG의 t1 task 수행시간 {{ ds }}, {{ ds_nodash }}'"
    )
    t2 = BashOperator(
        task_id = 'jinja_macro_bash',
        # 매크로 사용 -> 함수적 기능 활용
        bash_command = "echo '일주일 전 수행 시간 계산 {{ macros.ds_add(ds, -7) }}, 랜덤 숫자 {{ macros.random() }}'"
    )
    t3 = PythonOperator(
        task_id = 'jinja_used_python',
        python_callable = _print
    )

    # 4. 의존성 정의 (Task 실행 방향성 설정)
    t1 >> t2 >> t3
    pass

 

이 코드는 Airflow에서 Jinja 템플릿과 매크로를 사용해 내부 context 정보에 접근하는 방법을 보여주는 DAG이다.

 

 

DAG 설정 부분

python
schedule_interval = '0 9 * * *'

크론 표현식으로, 매일 오전 9시에 스케줄이 작동한다. start_date는 2026년 2월 25일이고, catchup = False이므로 과거 미실행분을 소급 실행하지 않는다.

 

 

t1 — Jinja 템플릿으로 context 값 출력

python
bash_command = "echo 'DAG의 t1 task 수행시간 {{ ds }}, {{ ds_nodash }}'"

{{ }} 는 Jinja의 값 출력 표현식이다. Airflow는 Task를 실행할 때 이 부분을 자동으로 context 값으로 치환한다. ds는 2026-04-07 형태의 날짜 문자열이고, ds_nodash는 하이픈을 뺀 20260407 형태이다. 이 날짜는 실제 실행 시각이 아니라 논리적 실행 일자(logical date)이다.

 

 

t2 — 매크로를 활용한 날짜 계산과 랜덤

python
bash_command = "echo '일주일 전 {{ macros.ds_add(ds, -7) }}, 랜덤 {{ macros.random() }}'"

macros는 Airflow가 Jinja 컨텍스트에 주입해주는 유틸리티 모듈이다. macros.ds_add(ds, -7)은 ds 날짜에서 7일을 빼는 함수이고, macros.random()은 0~1 사이의 랜덤 값을 생성한다. Jinja 안에서 함수 호출까지 가능하다는 점이 핵심이다.

 

 

t3 — PythonOperator에서 context 접근

python
def _print(**kwargs):
    logging.info(f'ds 출력 { kwargs["ds"] }')
    logging.info(f'ds_nodash 출력 { kwargs["ds_nodash"] }')

PythonOperator에서는 Jinja를 쓰지 않고, 콜백 함수의 **kwargs를 통해 동일한 context 정보에 접근한다. Airflow가 함수를 호출할 때 ds, ds_nodash, execution_date 등 다양한 context 값을 키워드 인자로 자동 전달해 준다.

 

 

의존성

python
t1 >> t2 >> t3

t1 → t2 → t3 순서로 실행된다. 즉 Jinja로 직접 출력 → 매크로 함수 활용 → Python에서 context 접근 순으로 동일한 정보에 접근하는 3가지 방식을 순서대로 보여주는 구조이다.


정리하면, BashOperator에서는 {{ ds }} 같은 Jinja 문법으로, PythonOperator에서는 kwargs["ds"] 로 같은 context 값에 접근할 수 있고, macros를 쓰면 날짜 계산 같은 추가 기능도 Jinja 안에서 바로 사용할 수 있다는 것이 이 DAG의 핵심이다.



아래는 03 실습을 진행했을 때 출력되는 로그

# t1 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=03_basics_context_jinja/run_id=manual__2026-04-09T01:45:45.859869+00:00/task_id=jinja_used_bash/attempt=1.log
[2026-04-09, 10:45:52 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-09, 10:45:53 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-09, 10:45:53 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', "echo ' DAG의 t1 task 수행시간 2026-04-09, 20260409'"]
[2026-04-09, 10:45:53 KST] {subprocess.py:86} INFO - Output:
[2026-04-09, 10:45:53 KST] {subprocess.py:93} INFO -  DAG의 t1 task 수행시간 2026-04-09, 20260409
[2026-04-09, 10:45:53 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-09, 10:45:53 KST] {taskinstance.py:340} ▶ Post task execution logs

# t2 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=03_basics_context_jinja/run_id=manual__2026-04-09T01:45:45.859869+00:00/task_id=jinja_macro_bash/attempt=1.log
[2026-04-09, 10:45:58 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-09, 10:45:58 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-09, 10:45:58 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', "echo '일주일 전 수행 시간 계산 2026-04-02, 랜덤 숫자 0.9661192206873069'"]
[2026-04-09, 10:45:59 KST] {subprocess.py:86} INFO - Output:
[2026-04-09, 10:45:59 KST] {subprocess.py:93} INFO - 일주일 전 수행 시간 계산 2026-04-02, 랜덤 숫자 0.9661192206873069
[2026-04-09, 10:45:59 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-09, 10:45:59 KST] {taskinstance.py:340} ▶ Post task execution logs

# t3 로그
30c06fdb98c3
*** Found local files:
***   * /opt/airflow/logs/dag_id=03_basics_context_jinja/run_id=manual__2026-04-09T01:45:45.859869+00:00/task_id=jinja_used_python/attempt=1.log
[2026-04-09, 10:46:02 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-09, 10:46:02 KST] {03_basics_context_jinja.py:15} INFO - ds 출력 2026-04-09
[2026-04-09, 10:46:02 KST] {03_basics_context_jinja.py:16} INFO - ds_nodash 출력 20260409
[2026-04-09, 10:46:02 KST] {python.py:240} INFO - Done. Returned value was: None
[2026-04-09, 10:46:02 KST] {taskinstance.py:340} ▶ Post task execution logs

분기 (branching)

04_basics_branching.py

  • 파이썬 오퍼레이터 등 프로그래밍 상황에서 내부 연산 결과에 따라 의존성 방향성을 선택할 수 있음
  • Task 선택을 조건부로 진행(브런치 전략)
  • 분기를 통해서 생략된 Task는 status가 skipped로 설정됨(아래 캡처에서 분홍색 status가 skipped임.)
    • 참고
      • ALL_SUCCESS와 같은 설정이라면 skipped가 존재하는 것 자체로 fail로 간주함 -> 정책을 펼칠 수 있음
'''
    - 함수 내부 연산의 결과에 의해 조건부로 Task를 선택하여 진행 (의존성 컨트롤)
'''

# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator # BranchPythonOperator : Task 조건부 선택
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule # 성공, 실패, 최소 단위 등 조건 설정
import logging
import random

# 3-1. 콜백 함수 정의
def _branching(**kwargs):
    '''
        - 특정 조건에 따라 분기 처리 -> 특정 task로 다음 수행을 지정
    '''
    if random.choice([True, False]) :
        logging.info("task process 실행")
        return "process" # 이동하고 싶은 task_id값 반환 -> 해당 task가 수행됨
    else : 
        logging.info("task skip 실행")
        return "skip"
def _process(**kwargs):
    logging.info("task process : 특정 작업 수행 중...")

# 2. DAG 정의
with DAG(
    dag_id = '04_basics_branching',
    description = '분기 처리, 선택적 Task 구동',
    default_args = {
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    },
    schedule_interval = '@daily',
    start_date = datetime(2026, 2, 25),
    catchup = False,
    tags = ['branch', 'trigger_rule']
) as dag:
    # 3. Task 정의
    task_start = EmptyOperator(
        task_id = 'start'
    )
    task_branch = BranchPythonOperator(
        task_id = 'branching',
        python_callable = _branching
    )
    task_process = PythonOperator(
        task_id = 'process',
        python_callable = _process
    )
    task_skip = EmptyOperator(
        task_id = 'skip'
    )
    task_end = EmptyOperator(
        task_id = 'end',
        # Task 전체 수행에 대한 조건 부여 :
        # 현재 설정값 : 실패 X, 최소 1개는 수행해야함
        trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS # 모든 선행 Task가 실패한 경우를 제외하고는 모두 수행 -> process, skip 둘 중 하나는 반드시 수행됨
    )
    # 4. 의존성 정의 -> 시나리오 별 준비
    task_start >> task_branch
    task_branch >> task_process >> task_end
    task_branch >> task_skip >> task_end

 

이 코드는 Airflow에서 조건에 따라 다음 실행할 Task를 선택하는 분기(Branching) 처리를 보여주는 DAG이다.

 

 

핵심 개념 — BranchPythonOperator

일반 PythonOperator와 달리, 콜백 함수의 return 값으로 다음에 실행할 task_id를 지정한다. 반환되지 않은 경로의 Task는 자동으로 skipped 상태가 된다.

 

 

분기 함수

python
def _branching(**kwargs):
    if random.choice([True, False]):
        return "process"    # task_id가 "process"인 Task 실행
    else:
        return "skip"       # task_id가 "skip"인 Task 실행

random.choice로 True/False를 랜덤 선택하고, 그 결과에 따라 "process" 또는 "skip"이라는 task_id 문자열을 반환한다. Airflow는 이 반환값과 일치하는 task_id를 가진 Task만 실행하고, 나머지 경로는 건너뛴다.

 

 

Task 구성

start → branching → process → end
                  → skip    → end

start는 EmptyOperator로, 아무 작업 없이 DAG의 시작점 역할만 한다. branching이 분기 판단을 하고, process는 실제 작업을 수행하는 Task, skip은 건너뛸 때 타는 빈 Task이다. end는 양쪽 경로가 합류하는 종료 지점이다.

 

 

trigger_rule이 필요한 이유

python
task_end = EmptyOperator(
    task_id = 'end',
    trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)

여기가 이 DAG에서 가장 중요한 부분이다. end의 선행 Task는 process와 skip 두 개인데, 분기 처리 때문에 둘 중 하나는 반드시 skipped 상태가 된다.

Airflow의 기본 trigger_rule은 ALL_SUCCESS이다. 이 규칙에서는 선행 Task 중 하나라도 skipped이면 성공으로 인정하지 않기 때문에 end가 실행되지 않는다.

NONE_FAILED_MIN_ONE_SUCCESS로 바꾸면 "실패한 것만 없고 최소 1개가 성공이면 실행"이라는 조건이 되므로, 한쪽이 skipped여도 다른 쪽이 성공했으면 end가 정상 실행된다.

 

 

실행 시나리오 정리

랜덤 결과가 True인 경우, start → branching → process → end 순으로 실행되고 skip은 skipped 상태가 된다. False인 경우, start → branching → skip → end 순으로 실행되고 process가 skipped 상태가 된다. 어느 쪽이든 end는 항상 실행된다.

 

 

정리하면, BranchPythonOperator는 함수의 return 값으로 다음 Task를 선택하는 분기 처리를 담당하고, 분기로 인해 생기는 skipped Task 때문에 합류 지점에서는 trigger_rule을 반드시 조정해야 한다는 것이 이 DAG의 핵심이다.

분기를 통해 Task가 진행되며 분홍색 박스는 Skipped 상태임