AI 직무 관점에서 DE
데이터 엔지니어의 기본 역량, 세부 내용 등등 차주에 진행(참고)
정의
워크플로우 오케스트레이션
지휘자는 직접 작업을 수행하지 않는다. : 지시만 함
실질직인 작업 (수집, 추출, 전처리, 적재, ...)은 DAG 파트가 담당
airflow는 실질적인(DAG) 일을 명령하는 관리자 역할
-> airflow를 깔아야 일을 할 수 있음, DAG가 있어야 작업이 진행됨
기존 작업 관리자와의 차이점
자동화 : 윈도우 작업 스케쥴러, 리눅스 cron 등 활용
특정 시간에 특정 업무를 처리하게끔 함
위 내용을 등록하여 처리
단점
앞선 작업의 성공/실패 여부에 따른 분기 처리 등에 대한 유동적인 처리가 불가능하거나 어려움.
airflow
airflow는 상황 조치가 가능함. : 프로그램 가능
도입 이유
의존성 관리
A 작업이 성공해야만 B를 실행할 수 있음 (시나리오 구성 가능)
후행 및 재시도
작업 실패 발생 -> 5분 후에 재시도(retry) -> 계속 실패 -> 관리자에게 장애 통보
백필(backfill)
상황
데이터를 수집하는 타겟의 서버(서비스, 장비, 센서 등등) 장애로 데이터가 5일치가 누락된 상황 -> 복구되면 해당 데이터를 현재 시점까지 순서대로 다시 수집(되돌린다), raw 데이터는 존재하고 이 데이터를 가져오는 중간에서 장애가 발생한 것으로 가정
4대 핵심 아키텍쳐(구성원)
스케쥴러
24시간, 365일 계속 가동(깨어있음) -> 서버(서비스, EC2 세팅)
흐름
지금 실행해야할 작업이 존재하는가?를 체크(감시) -> 조건에 맞는 작업이 존재한다면 Executor에게 전송(내부 통신을 통해)
웹서버
모니터링, 작업 조절 등 관리 페이지 제공
URL
localhost:8080 : 자바에서 많이 쓰는 포트, 이거 보면 자바로 짰나? 싶은 느낌
메타데이터 관리 DB
스케쥴 등 데이터베이스에서 관리
mysql, postgres등 사용 가능
차후 구성 시 docker-compose에서 구성
인증 기능
저장 내용
작업의 실행/성공/실패 등..(로그)
특정 DAG가 오늘 20시에 작동 예정이다(스케쥴)
각종 모든 상태 정보(Status)
스케쥴러, 웹서버 등 DB를 바라보면서 통신 진행(상호 대화)
executor / worker (실제 작업 관련 내용임)
실제 작업 담당(DAG)
LocalExecutor
로컬 머신(PC, 단일 인스턴스 등) -> 프로세스 가동 -> JOB 수행
기본은 여기서 진행(참고)
CeleryExecutor / KubernatesExecutor
N개의 서버를 가동하여 작업을 분산 : 대용량 처리
핵심 구성원(세부적)
대시보드를 통해서 시각적으로 추후 추가적으로 확인(참고)
DAG(Directed Acyclic Graph)
방향성 을 가진 비순환 그래프
순환(Loop)이 없는 작업 흐름
예시
단독 수행
앞단계 수행 -> 완료 -> 다음 작업 직접 호출
앞단계 수행 -> 완료 -> 산출물을 통해 이벤트 -> 다음 작업 호출
...
무한 루프 금지
일방통행 작업
A -> B -> C (O)
A -> b -> .. -> A (X)
특정 작업을 수행하는 프로그램 단위
Operator
DAG 내부에서 특정 작업(큰 목적)을 위해 작은 작업들이 구성될 수 있음
어떤 작업을 할지(사전에 정의된 템플릿)
XXXXOperator
Ex)
BashOperator (리눅스 기반)
PythonOperator (파이썬 기반)
SQLOperator (DB 기반)
Task
Operator 수행 시 구분하는 단위(이름)
DAG에서는 Node 한 개를 의미
대시보드 상에서 특정 DAG 선택하고 상세보기 등 선택 -> Graph로 시각화 확인 가능
DAG는 큰 작업 한 개를 의미하고 그게 모여서 더 큰 목적을 달성할 수 있음
Task Instance
Task가 실행되면 인스턴스가 할당(객체 할당, 메모리 할당) -> 한 개의 점으로 표현(대시보드) -> 색상으로 수행 여부 판단(GreenLight : 성공, RedLight : 실패)
Task + 스케쥴(시간/날짜 기록되어 있을 것임) 결합된 단위
스케쥴에 해당되면 수행, 완료되면 반환되는 구조
대시보드를 통해서 확인
주의사항
동일한 코드(DAG 내)는 1회를 수행하든 n번 수행하든 결과가 항상 같아야 한다. -> 멱등성
중복된 데이터가 쌓이지 않도록 주의해야 한다.
개요
도커 기반
Docker Compose 사용
Official 사이트에서 제공되는 yaml 파일 사용(세팅에 신경을 크게 쓰지 않아도 된다.)
기본 설치 전 구성(준비)
도커 기반 설치
대시보드 접속
localhost:8080
ID/PW : airflow
DAG 기본형 구성
간단한 기능부터 복잡한 기능(요구사항이 복잡한 구조)까지 순차적으로 진행
AI 데이터, AWS 서비스 연동 등 다양하게 확장
DAG 등록
DAG로서 역할(ID값 등)이 구성되면 자동으로 등록된다.
DAG 구성 중 문제가 발생하면 자동으로 오류가 출력됨.
DAG 구동
0단계
필요한 DAG 찾기
태그 검색, 기본 검색 등등 찾기
1단계
해당 DAG 진입 (클릭을 통해 진입 가능)
ON 상태 구성 (활성화)
pause/unpause 버튼
특정 시간이 되고 조건이 맞으면 작동함
2단계
즉시 실행 -> 재생 버튼(triger button)클릭 시 즉시 작동됨.
3단계
모니터링 -화면 왼쪽에서 특정 수행 단계를 클릭하면
Task 별, 전체 등등 확인 가능
Task 별로 ID로 나열되어 있음, 정사각형 박스 1개가 Task Instance임
Task Instance
박스별 클릭 > logs 클릭 : 개별 로그 상세하게 확인 가능(VScode에서 logs 하위파일에서도 확인 가능함)
박스 클릭하면 logs , xCom, task duration 메뉴 추가됨
# t1 로그
30c06fdb98c3
*** Found local files:
*** * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=date-print/attempt=1.log
[2026-04-08, 14:35:13 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:13 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:13 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'date']
[2026-04-08, 14:35:13 KST] {subprocess.py:86} INFO - Output:
[2026-04-08, 14:35:13 KST] {subprocess.py:93} INFO - Wed Apr 8 05:35:13 UTC 2026
-> 시간 출력
[2026-04-08, 14:35:13 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:13 KST] {taskinstance.py:340} ▶ Post task execution logs
# t2 로그
30c06fdb98c3
*** Found local files:
*** * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=sleep/attempt=1.log
[2026-04-08, 14:35:15 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:16 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:16 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'sleep 5']
[2026-04-08, 14:35:16 KST] {subprocess.py:86} INFO - Output:
-> 5초 지연
[2026-04-08, 14:35:21 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:21 KST] {taskinstance.py:340} ▶ Post task execution logs
# t3 로그
30c06fdb98c3
*** Found local files:
*** * /opt/airflow/logs/dag_id=01_basics_bash/run_id=manual__2026-04-08T05:35:11.037678+00:00/task_id=echo-print/attempt=1.log
[2026-04-08, 14:35:23 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 14:35:23 KST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-04-08, 14:35:23 KST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "Hello, Airflow!"']
[2026-04-08, 14:35:23 KST] {subprocess.py:86} INFO - Output:
[2026-04-08, 14:35:23 KST] {subprocess.py:93} INFO - Hello, Airflow!
-> 메시지 출력
[2026-04-08, 14:35:23 KST] {subprocess.py:97} INFO - Command exited with return code 0
[2026-04-08, 14:35:23 KST] {taskinstance.py:340} ▶ Post task execution logs
로그를 통해 어떤 식으로 돌아가는지 관찰할 수 있었다.
subprocess : 이런 코드는 짠 적이 없는데 서브프로세스로 관리하게끔 airflow가 내부적으로 작동하고 있음
docker 내의 컨테이너 중 호스트 PC와 연동된 경로 확인
호스트PC(~/airflow-local) <-> airflow-worker-1(opt/airflow)
파이썬 기능/함수 등등 Task의 주된 내용으로 구성
Task와 Task 간 통신 담당 xCom (Cross Communication) 사용, 확인
앞단계 Task의 결과물을 다음 단계 Task에서 사용 (의존성 확인)
xCom : 내부 게시판으로 이해하면 됨. -> 특정 데이터가 게시되면 다른 Task가 접근하여 사용한다. -> Context 개념
기본 작동
DAG의 dag_id, task 정의, task_id, 의존성만 구성되면 작동됨. -> 껍데기는 이정도만 만들어도 틀은 만들 수 있다.
# t1 로그
30c06fdb98c3
*** Found local files:
*** * /opt/airflow/logs/dag_id=02_basics_python/run_id=manual__2026-04-08T06:58:09.111626+00:00/task_id=extract_task_data/attempt=1.log
[2026-04-08, 15:58:11 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 15:58:12 KST] {02_basics_python.py:34} INFO - 데이터 추출(Extract 작업) 중...
[2026-04-08, 15:58:12 KST] {02_basics_python.py:35} INFO - 작업시간 2026-04-08, 실행ID manual__2026-04-08T06:58:09.111626+00:00
[2026-04-08, 15:58:12 KST] {02_basics_python.py:36} INFO - 데이터 추출(Extract 작업) 완료!
-> 출력 정상적으로 되었음
[2026-04-08, 15:58:12 KST] {python.py:240} INFO - Done. Returned value was: Data Extract 성공
[2026-04-08, 15:58:12 KST] {taskinstance.py:340} ▶ Post task execution logs
# t2 로그
30c06fdb98c3
*** Found local files:
*** * /opt/airflow/logs/dag_id=02_basics_python/run_id=manual__2026-04-08T06:58:09.111626+00:00/task_id=transform_task_data/attempt=1.log
[2026-04-08, 15:58:15 KST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-04-08, 15:58:15 KST] {02_basics_python.py:60} INFO - 데이터 변환(Transform 작업) 중...
[2026-04-08, 15:58:15 KST] {02_basics_python.py:61} INFO - 결과 : Data Extract 성공
[2026-04-08, 15:58:15 KST] {02_basics_python.py:62} INFO - 데이터 변환(Transform 작업) 완료!
-> 출력 정상적으로 되었음
[2026-04-08, 15:58:15 KST] {python.py:240} INFO - Done. Returned value was: None
[2026-04-08, 15:58:15 KST] {taskinstance.py:340} ▶ Post task execution logs