1. 배경 — 왜 EMR + Spark인가
데이터 규모가 수 GB를 넘어 수십 GB ~ TB 단위로 커지면 단일 노드 기반 도구(pandas, polars)로는 처리 한계가 옴. 분산처리 프레임워크인 Spark가 필요해지고, 이를 실행할 클러스터 인프라로 **AWS EMR(Elastic MapReduce)**을 사용함.
1.1 EMR 특징
- 사용한 만큼만 지불 — 필요할 때 생성, 작업 완료 후 해제하는 비용 효율 구조
- 스팟 인스턴스 지원 — 온디맨드 대비 최대 90% 비용 절감 가능 (단, 회수 위험 존재)
- 서버리스 옵션 — EMR Serverless로 클러스터 관리 부담 제거 가능
- 상시 vs 일회성 — 조회 주기가 짧고 빈번하면 상시 유지 또는 서버리스, 일배치성이면 일회성 구성이 유리
일배치 ETL은 "필요할 때만 클러스터를 띄우고 끝나면 죽이는" 구조가 비용·운영 측면에서 최적임.
2. 전체 아키텍처
[로컬] dummy_bigdata_gen.py → raw_data.json
↓ (수동 업로드)
[S3] raw/dt=YYYY-MM-DD/raw_data.json
↓ (DAG 트리거)
[Airflow] 14_EMR_SPARK DAG
├─ create_cluster (EMR 클러스터 생성)
├─ dummy (확장 여지)
├─ run_spark (spark-submit step 등록)
├─ watch_spark (Step 완료 감지 센서)
└─ terminate_cluster(EMR 클러스터 종료)
↓
[EMR] spark_data_etl.py 실행 (S3에서 다운로드)
↓
[S3] processed/dt=YYYY-MM-DD/*.parquet
- S3 — 원본/결과 데이터 저장소 (영속 레이어)
- Airflow — 워크플로우 오케스트레이션, EMR 라이프사이클 제어
- EMR — 일회성 분산 컴퓨팅 클러스터
- Spark — EMR 위에서 실제 ETL 로직 수행
Airflow는 직접 데이터를 처리하지 않음. EMR을 띄우고 죽이는 "리모컨" 역할만 함.
3. 더미 데이터 생성기 — dummy_bigdata_gen.py
ETL의 입력이 되는 원본 데이터를 노이즈 포함하여 생성하는 스크립트.
RECORD_COUNT = 1000 # 테스트: 1000건 / 풀테스트: 천만건
def generator_dummy_data_with_noise():
dummy_data = list()
event_types = ['view','click','purchase','error', None]
os_types = ['iOS', 'Android', 'Windows', 'Mac', 'Unknown']
for _ in range(RECORD_COUNT):
record = {
"event_id" : str(uuid.uuid4()),
"user_id" : f"user_{random.randint(1, 500)}",
"event_type" : random.choice(event_types),
"product_id" : random.randint(1000, 2000),
"price" : random.randint(1000, 100000),
"timestamp" : (datetime.now() - timedelta(hours=random.randint(0, 24)))
.strftime("%Y-%m-%d %H:%M:%S"),
"os" : random.choice(os_types)
}
# 노이즈 삽입
확률 = random.random()
if 확률 < 0.05:
record['user_id'] = None # 5% : 고객 아이디 결측
elif 확률 < 0.1:
record['price'] = -50 # 5% : 음수 가격
elif 확률 < 0.15:
record['timestamp'] = "invalid-format" # 5% : 시간 형식 깨짐
dummy_data.append(record)
return dummy_data
- event_id — UUID 기반 고유값. 중복 제거 키로 사용
- user_id — user_1 ~ user_500 범위에서 무작위 선택해 중복 이벤트 로그 시뮬레이션
- 노이즈 3종 — 결측(null), 논리 오류(음수), 형식 깨짐(invalid string)으로 실제 운영 데이터 환경 재현
- NDJSON 저장 — 한 줄에 JSON 1개씩(Newline Delimited JSON). Spark가 라인 단위로 병렬 파싱 가능
with open('raw_data.json', 'w', encoding='utf-8') as f:
for log in data:
f.write(json.dumps(log) + "\n")
일반 JSON 배열([{...}, {...}])로 저장하면 Spark가 multiline=true 옵션 없이는 못 읽음. NDJSON이 분산처리 친화적.
4. Spark ETL 스크립트 — spark_data_etl.py
EMR 클러스터에서 실제로 실행되는 PySpark 코드. S3에 업로드되어 spark-submit으로 호출됨.
4.1 인자 처리와 경로 구성
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
if len(sys.argv) > 1:
TARGET_DATE = sys.argv[1]
else:
raise ValueError("날짜 인자가 누락되었습니다. (YYYY-MM-DD)")
BUCKET_NAME = 'de-ai-04-827913617635-ap-northeast-2-an'
INPUT_PATH = f"s3://{BUCKET_NAME}/raw/dt={TARGET_DATE}/raw_data.json"
OUTPUT_PATH = f"s3://{BUCKET_NAME}/processed/dt={TARGET_DATE}/"
- sys.argv[1] — Airflow DAG에서 spark-submit 명령에 전달한 날짜 인자 수신
- 동적 경로 — 날짜별로 S3 입출력 경로가 분리되어 파티션 구조 유지
- dt=YYYY-MM-DD — Hive 스타일 파티션 키. Athena·Glue 카탈로그와 자연스럽게 호환
4.2 스키마 정의와 데이터 추출 (Extract)
spark = (SparkSession.builder
.appName(f'Daily_Data_Cleaning_{TARGET_DATE}')
.getOrCreate())
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("price", IntegerType(), True),
StructField("timestamp", StringType(), True),
StructField("os", StringType(), True),
])
raw_df = spark.read.schema(schema).json(INPUT_PATH)
- SparkSession.builder.getOrCreate() — 세션이 있으면 가져오고 없으면 생성
- 명시적 스키마 — Spark의 스키마 추론(inferSchema) 비활성화. 수억 건 입력에서 추론 비용이 매우 큼
- timestamp 컬럼은 StringType — "invalid-format" 같은 더티 데이터가 섞여 있어 일단 문자열로 받고 파싱 단계에서 검증
4.3 데이터 클리닝 (Transform)
clean_df = (raw_df
.filter(F.col('user_id').isNotNull())
.filter(F.col('price') >= 0)
.withColumn('event_time',
F.to_timestamp(F.col('timestamp'), "yyyy-MM-dd HH:mm:ss"))
.filter(F.col('event_time').isNotNull())
.fillna({"event_type": "unknown", "os": "other"})
.dropDuplicates(['event_id'])
)
final_df = clean_df.withColumn('processed_at', F.current_timestamp())
- user_id null 제거 — 사용자 식별 불가 데이터는 분석 가치 없음
- price >= 0 — 음수 가격은 논리적 오류. null은 자동으로 비교 결과가 null이 되어 함께 걸러짐
- to_timestamp — 문자열을 timestamp 타입으로 변환. 파싱 실패 시 null 반환
- event_time null 제거 — "invalid-format" 같은 깨진 형식 행 제거
- fillna — event_type/os의 null을 의미 있는 기본값으로 대체
- dropDuplicates(['event_id']) — 동일 이벤트 ID 중복 제거
- processed_at — ETL 처리 시각을 컬럼으로 기록(감사용 메타데이터)
| 단계 | 연산 | 목적 |
| 1 | filter user_id | 식별 불가 행 제거 |
| 2 | filter price >= 0 | 논리 오류 제거 |
| 3 | to_timestamp | 타입 변환 |
| 4 | filter event_time | 형식 오류 제거 |
| 5 | fillna | 결측 보정 |
| 6 | dropDuplicates | 중복 제거 |
| 7 | processed_at | 메타데이터 부여 |
Transform 단계는 lazy하게 평가됨. 실제 연산은 write 또는 count 같은 Action에서 한 번에 실행됨.
4.4 결과 저장 (Load)
final_df.write.mode('overwrite').parquet(OUTPUT_PATH)
spark.stop()
- mode('overwrite') — 동일 경로에 있는 데이터를 덮어씀. 같은 날짜로 재실행 시 멱등성 보장
- parquet — 컬럼 기반 압축 포맷. 후속 Athena 조회 시 스캔 비용 절감
- spark.stop() — 세션 명시적 종료. EMR 클러스터 자체는 Airflow가 종료 책임
5. Airflow DAG — 14_emr_spark.py
EMR 클러스터 라이프사이클 전체를 제어하는 워크플로우.
5.1 EMR 클러스터 구성 정의
JOB_FLOW_OVERRIDES = {
"Name": "Airflow-Automated-EMR-Cluster-de-04",
"ReleaseLabel": "emr-6.10.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
"Instances": {
"Ec2SubnetId": "subnet-xxxxxxxx", # VPC 서브넷 필수
"InstanceGroups": [
{
"Name": "Master node",
"Market": "SPOT",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
{
"Name": "Core nodes",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2,
},
],
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
"LogUri": EMR_LOG_URI,
"VisibleToAllUsers": True,
}
- ReleaseLabel — EMR 버전. emr-6.10.0은 Spark 3.3.1 포함
- Ec2SubnetId — m5.xlarge는 EC2-Classic이 아닌 VPC 전용 인스턴스 타입. 서브넷 지정 필수
- Market: SPOT — 스팟 인스턴스 사용으로 비용 절감. 단, Master까지 SPOT이면 회수 시 클러스터 전체 종료 위험
- KeepJobFlowAliveWhenNoSteps: True — Step 완료 후 자동 종료 방지. Airflow가 명시적으로 종료 제어
- JobFlowRole / ServiceRole — EC2 인스턴스용 / EMR 서비스용 IAM Role
- LogUri — EMR 클러스터 로그를 S3에 저장
학습·개발 환경에서는 Master를 ON_DEMAND로, Core만 SPOT으로 두는 것이 안정적. 비용 부담은 적고 회수 위험은 크게 줄어듦.
5.2 Spark Submit Step 정의
SPARK_SUBMIT = [
{
"Name": "Daily Data Cleaning Job",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
SPARK_SCRIPT_PATH,
"2026-05-19" # 향후 "{{ ds }}"로 동적 처리
],
},
}
]
- command-runner.jar — EMR 내장 실행기. spark-submit/hadoop/hive 등 명령 실행 가능
- --deploy-mode cluster — Driver를 클러스터 내 노드에서 실행. Airflow 측 부하 없음
- ActionOnFailure: CONTINUE — Step 실패해도 다음 태스크(terminate_cluster)로 진행. EMR을 반드시 정리하기 위함
- "{{ ds }}" — Airflow 매크로. 실행 날짜를 YYYY-MM-DD로 자동 치환 (현재는 하드코딩)
5.3 Task 구성
create_cluster_task = EmrCreateJobFlowOperator(
task_id="create_cluster",
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default'
)
run_spark_task = EmrAddStepsOperator(
task_id="run_spark",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
steps=SPARK_SUBMIT,
aws_conn_id='aws_default'
)
watch_spark_task = EmrStepSensor(
task_id="watch_spark",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='run_spark', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
terminate_cluster_task = EmrTerminateJobFlowOperator(
task_id="terminate_cluster",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
aws_conn_id='aws_default',
trigger_rule='all_done'
)
- EmrCreateJobFlowOperator — 클러스터 생성. 생성된 job_flow_id를 XCom으로 자동 push
- XCom 참조 — {{ task_instance.xcom_pull(...) }} Jinja 템플릿으로 이전 태스크 결과 전달
- EmrAddStepsOperator — 실행 중인 클러스터에 Step(작업) 추가. step_id 리스트를 XCom으로 push
- EmrStepSensor — Step 완료까지 주기적으로 폴링. COMPLETED / FAILED 상태 감지
- trigger_rule='all_done' — 앞 태스크가 성공이든 실패든 무조건 실행. EMR 클러스터 정리를 보장
5.4 의존성
create_cluster_task >> dummy_task >> run_spark_task >> watch_spark_task >> terminate_cluster_task
- >> — Airflow 의존성 연산자(시프트). >(비교 연산자)와 혼동 주의
>를 쓰면 Python이 True/False만 반환하고 의존성이 등록되지 않음. 모든 태스크가 병렬 실행되어 클러스터 생성 전에 종료 태스크가 먼저 돌아가는 참사가 발생함.
6. 트러블슈팅 기록
실제로 마주친 에러와 해결 과정.
6.1 SparkSession 빌더 오타
AttributeError: 'Builder' object has no attribute 'appNAme'
- 원인 — .appName() 메서드명 오타(appNAme)
- 추가 이슈 — 메서드 체이닝 시 라인 끝에 \ 또는 괄호 누락으로 구문 오류
- 해결 — 괄호로 감싸 깔끔하게 체이닝
spark = (SparkSession.builder
.appName(f'Daily_Data_Cleaning_{TARGET_DATE}')
.getOrCreate())
6.2 의존성 연산자 오류로 EMR이 먼저 종료됨
botocore.exceptions.ClientError: ValidationException
A job flow that is shutting down, terminated, or finished may not be modified.
- 원인 — DAG 의존성을 >>가 아닌 >로 작성해서 모든 태스크가 병렬 실행됨
- 결과 — terminate_cluster가 run_spark보다 먼저 또는 동시에 실행되어 클러스터가 죽은 상태에서 Step 추가 시도
- 해결 — >>로 수정
6.3 VPC 서브넷 미지정 (VALIDATION_ERROR)
Subnet is required: The specified instance type m5.xlarge can only be used in a VPC.
- 원인 — m5.xlarge는 VPC 전용 인스턴스 타입인데, 계정에 default VPC가 없거나 Ec2SubnetId 미지정
- 해결 — Instances 블록에 Ec2SubnetId 추가
- 체크 포인트 — VPC 콘솔에서 퍼블릭 서브넷 선택 (라우팅 테이블의 0.0.0.0/0 대상이 igw-로 가는지 확인)
"Instances": {
"Ec2SubnetId": "subnet-xxxxxxxx",
"InstanceGroups": [...],
...
}
6.4 Jinja 템플릿 따옴표 누락
"{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value) }}"
^^^^^^^^^^^^^^^
- 원인 — 'return_value 뒤 닫는 따옴표 누락
- 결과 — 템플릿 렌더링 실패로 job_flow_id에 이상한 값 전달
- 해결 — 따옴표 올바르게 닫기
7. 운영 시 고려사항
| 항목 | 권장 | 설정 이유 |
| Master 인스턴스 마켓 | ON_DEMAND | Spot 회수 시 클러스터 전체 사망 위험 |
| Core 인스턴스 마켓 | SPOT | 회수돼도 다른 노드로 재실행 가능, 비용 절감 |
| ActionOnFailure | CONTINUE | terminate_cluster까지 도달해 비용 누수 방지 |
| trigger_rule (terminate) | all_done | 상류 태스크 실패해도 EMR 정리 보장 |
| spark-submit deploy-mode | cluster | Driver를 EMR에서 실행해 Airflow 부하 분리 |
| 출력 포맷 | parquet | 후속 분석(Athena) 비용 최적화 |
비용 사고를 막는 핵심은 **"종료 태스크가 반드시 실행되도록 보장하는 것"**임. trigger_rule='all_done'은 협상 불가.
8. 요약
| 구성 요소 | 역할 | 핵심 포인트 |
| S3 | 원본·결과 데이터 저장 | raw/dt=... / processed/dt=... 파티션 구조 |
| Airflow | 워크플로우 오케스트레이션 | EMR 라이프사이클 제어, XCom으로 ID 전달 |
| EMR | 일회성 분산 컴퓨팅 클러스터 | VPC 서브넷 필수, Master는 ON_DEMAND 권장 |
| Spark | ETL 본 작업 | 명시적 스키마 + lazy evaluation + Parquet 저장 |
| >> | Airflow 의존성 연산자 | >와 혼동 시 모든 태스크 병렬 실행됨 |
| trigger_rule=all_done | 종료 태스크 보장 | 클러스터 비용 사고 방지의 핵심 |
분산 처리 인프라를 "필요할 때만 빌려 쓰고 끝나면 반드시 반납하는" 자동화가 핵심. Airflow의 진짜 가치는 본 작업 실행이 아니라 인프라 라이프사이클 보장에 있음.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > 데이터 처리' 카테고리의 다른 글
| 데이터 처리 5 - Docker Spark (0) | 2026.05.19 |
|---|---|
| 데이터 처리 4 - Apache Spark (0) | 2026.05.18 |
| 데이터 처리 3 - polars (1) | 2026.05.14 |
| 데이터 처리 2 - pandas (0) | 2026.05.13 |
| 데이터 처리 1 - Numpy (0) | 2026.05.13 |