SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Airflow - 8 (S3와 Airflow의 연계)

dev-lee 2026. 4. 14. 17:36

Airflow - AWS S3 연동 기초 (파일 업로드 DAG)

Airflow에서 AWS S3에 파일을 업로드하고, 업로드 결과를 확인하는 기본적인 DAG를 분석해보았다. AWS 액세스 키가 정상 작동하는지 체크하고, 로컬 파일을 S3에 적재하는 흐름을 다루었다.


전체 구조

파일 생성 (Bash) → S3 업로드 → 업로드 확인

BashOperator로 로컬에 텍스트 파일을 생성하고, LocalFilesystemToS3Operator로 S3에 업로드한 뒤, S3Hook으로 실제 업로드 여부를 검증하는 흐름.


1. 모듈 가져오기

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import logging
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
  • BashOperator → 쉘 명령어를 Task로 실행. 여기서는 텍스트 파일 생성에 사용
  • LocalFilesystemToS3Operator → 로컬 파일을 S3 버킷에 업로드하는 전용 오퍼레이터. Airflow의 Amazon Provider 패키지(apache-airflow-providers-amazon)에 포함되어 있음
  • S3Hook → Python 코드 내에서 S3에 직접 접근하여 파일 목록 조회, 다운로드 등을 수행하는 Hook

LocalFilesystemToS3Operator는 업로드 전용이고, S3Hook은 범용 S3 접근 도구라는 차이가 있다.


2. 환경변수 설정

BUCKET_NAME = "de-ai-04-827913617635-ap-northeast-2-an"
FILE_NAME = 'hello.txt'
LOCAL_PATH = f'/opt/airflow/dags/data/{FILE_NAME}'
  • BUCKET_NAME → S3 버킷 이름. 글로벌하게 고유해야 하며, 보통 팀명-계정ID-리전 조합으로 네이밍. 827913617635는 AWS 루트 계정 ID, ap-northeast-2는 서울 리전
  • FILE_NAME → S3에 저장될 파일명
  • LOCAL_PATH → Docker 컨테이너 내부 기준 파일 경로. 호스트 PC가 아닌 Airflow worker 컨테이너 안의 경로

3. 콜백 함수 (_check_s3)

def _check_s3(**kwargs):
    hook = S3Hook(aws_conn_id='aws_default')
    keys = hook.list_keys(bucket_name=BUCKET_NAME)
    if not keys:
        raise ValueError('업로드 실패')
    for key in keys:
        logging.info(f'키 : {key}')

업로드 후 검증을 수행하는 함수:

  • S3Hook(aws_conn_id='aws_default') → Airflow Connections에 등록된 AWS 인증 정보를 사용하여 S3 접근. Connection Type은 Amazon Web Services로 설정되어 있어야 함
  • hook.list_keys(bucket_name=BUCKET_NAME) → 해당 버킷 내 모든 객체의 키(파일명) 목록을 반환. S3에서 "키"는 파일의 전체 경로를 의미
  • if not keys → 버킷이 비어있으면 업로드 실패로 판단하여 ValueError를 발생시킴. Airflow는 이를 Task 실패로 인식
  • 키가 존재하면 각 키를 로그로 출력하여 업로드 결과를 확인

현재는 버킷 전체 키를 조회하는 수준이며, 실무에서는 특정 경로(prefix)를 지정하여 디테일하게 확인하는 것이 일반적이다.


4. DAG 정의

with DAG(
    dag_id="08_aws_s3_basics",
    description="AWS 연동, S3 업로드",
    default_args={
        'owner': 'de_2team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    },
    schedule_interval='@daily',
    start_date=datetime(2026, 2, 25),
    catchup=False,
    tags=['aws', 's3']
) as dag:

매일 1회 실행되는 스케줄. AWS S3 관련 DAG임을 tags로 표시.


5. Task 정의

5-1. 파일 생성 Task (BashOperator)

task_create_file = BashOperator(
    task_id="create_file",
    bash_command=f'echo "Hello Airflow & S3" > {LOCAL_PATH}'
)
  • BashOperator → 쉘 명령어를 직접 실행하는 오퍼레이터
  • echo "..." > 파일경로 → 텍스트를 파일로 저장하는 리눅스 명령어. >는 파일이 있으면 덮어쓰고, 없으면 새로 생성
  • PythonOperator 대신 BashOperator를 쓴 이유는, 단순 파일 생성에는 쉘 명령어가 더 간결하기 때문

5-2. S3 업로드 Task (LocalFilesystemToS3Operator)

task_upload_to_s3 = LocalFilesystemToS3Operator(
    task_id="upload_to_s3",
    filename=LOCAL_PATH,
    dest_key=FILE_NAME,
    dest_bucket=BUCKET_NAME,
    aws_conn_id='aws_default',
    replace=True
)

핵심 Task. 각 파라미터를 살펴보면:

  • filename → 업로드할 원본 파일의 로컬 경로 (컨테이너 내부 기준)
  • dest_key → S3 버킷 내에서의 저장 경로 및 파일명. 여기서는 hello.txt로 버킷 루트에 저장. data/hello.txt처럼 경로를 포함하면 폴더 구조로 저장 가능
  • dest_bucket → 대상 S3 버킷 이름
  • aws_conn_id → Airflow Connections에 등록된 AWS 인증 정보 ID
  • replace=True → 동일한 키(파일명)가 이미 존재하면 덮어씀. False로 하면 이미 존재할 경우 스킵

Python 코드 없이 오퍼레이터만으로 S3 업로드를 처리할 수 있다는 것이 장점이다.

5-3. 업로드 확인 Task

task_check_s3 = PythonOperator(
    task_id="check_s3",
    python_callable=_check_s3
)

위에서 정의한 _check_s3 함수를 실행하여 업로드 결과를 검증하는 Task.


6. Task 의존성

task_create_file >> task_upload_to_s3 >> task_check_s3

전체 흐름:

  1. create_file → BashOperator로 로컬에 텍스트 파일 생성
  2. upload_to_s3 → LocalFilesystemToS3Operator로 S3에 업로드
  3. check_s3 → S3Hook으로 업로드 여부 검증

정리

구성 요소 역할
BashOperator 쉘 명령어로 로컬 파일 생성
LocalFilesystemToS3Operator 로컬 파일을 S3에 업로드하는 전용 오퍼레이터
S3Hook Python 코드 내에서 S3에 접근 (파일 목록 조회 등)
aws_conn_id Airflow Connections에 등록된 AWS 인증 정보
replace=True 동일 파일 존재 시 덮어쓰기

Airflow에서 AWS S3에 파일을 업로드하고 검증하는 기본 파이프라인을 구성하여, AWS 연동의 기초를 확인함.