1. 배경 — 왜 Docker로 Spark를 띄우는가
EMR을 띄우려면 비용·IAM·VPC 등 인프라 학습 부담이 큼. 학습 단계에서는 로컬에서 Spark 클러스터를 흉내 내는 환경이 필요함. Docker로 Spark Master + Worker 컨테이너를 띄우면 EMR과 동일한 분산처리 개념(파티션, executor, shuffle)을 무료로 체험 가능.
1.1 EMR vs Docker Spark 비교
| 항목 | EMR | Docker Spark |
| 환경 | AWS 관리형 클러스터 | 로컬 컨테이너 클러스터 |
| 비용 | 인스턴스 사용 시간 과금 | 무료(로컬 리소스만 사용) |
| 데이터 위치 | S3 중심 | 로컬 볼륨(/opt/data) |
| 클러스터 매니저 | YARN | Spark Standalone |
| Master 접속 | EMR API/EC2 | spark://spark-master:7077 |
| 용도 | 운영·대규모 처리 | 학습·개발·소규모 검증 |
동일한 PySpark 코드라도 master 설정과 데이터 경로만 바꿔주면 두 환경을 오갈 수 있음. 학습을 Docker에서 검증한 뒤 EMR로 이식하는 흐름이 자연스러움.
2. 전체 흐름
[로컬 컨테이너 클러스터]
spark-master ──┐
spark-worker1 ──┼── spark://spark-master:7077
spark-worker2 ──┘
↑
[/opt/jobs/etl_job.py] ── spark-submit
↓
[Extract] /opt/data/hospital_raw.csv (헤더 CSV)
[Transform] filter → 파생 컬럼 → Window 함수
[Load] /tmp/gold/output/
├─ dept_monthly (year/month 파티션 Parquet)
└─ patient_summary (단일 Parquet)
- Master 컨테이너 — 클러스터 매니저. Driver의 Job을 받아 Executor에 분배
- Worker 컨테이너 — 실제 연산 수행. Executor 프로세스 호스팅
- 볼륨 마운트 — /opt/data(입력), /opt/jobs(스크립트), /tmp/gold(출력)
- Standalone 모드 — YARN/Mesos 없이 Spark 자체 클러스터 매니저 사용
3. SparkSession 구성과 클러스터 연결
3.1 빌더 설정
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = (SparkSession.builder
.appName('Hospital_ETL_Cluster')
.master('spark://spark-master:7077')
.config("spark.executor.memory", "512m")
.config("spark.executor.cores", "1")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
- appName — Spark UI에 표시되는 작업 이름. 로그·모니터링 추적 단위
- master('spark://spark-master:7077') — Docker 네트워크 내 Master 컨테이너 호스트명과 기본 포트. 로컬 단독 실행이면 local[*]
- spark.executor.memory: 512m — Executor 당 메모리. 로컬 컨테이너 자원 한계 고려
- spark.executor.cores: 1 — Executor 당 CPU 코어 수
- spark.sql.shuffle.partitions: 8 — Shuffle(groupBy, join 등) 후 파티션 수. 기본값 200은 소규모 데이터에 과함
- setLogLevel("WARN") — INFO 레벨 로그 억제. 결과 출력 가독성 향상
기본 shuffle.partitions=200은 BigData 전제. 로컬·소규모에서는 8~16 정도가 적절함. 너무 많으면 작은 파일이 양산되고 task 오버헤드가 커짐.
3.2 연결 정보 출력
print(f" Master : {spark.sparkContext.master}")
print(f" App ID : {spark.sparkContext.applicationId}")
print(f" App Name : {spark.sparkContext.appName}")
- sparkContext.master — 실제 연결된 클러스터 매니저 주소
- applicationId — Spark가 부여한 고유 작업 ID. Spark History Server에서 추적 키로 사용
4. Extract — CSV 데이터 로드
4.1 스키마 정의
schema = StructType([
StructField("visit_id", LongType(), False),
StructField("patient_id", StringType(), False),
StructField("visit_date", StringType(), True),
StructField("department", StringType(), True),
StructField("diagnosis", StringType(), True),
StructField("charge", LongType(), True),
StructField("status", StringType(), True),
])
- 명시적 스키마 — inferSchema=True는 전체 데이터를 1회 스캔하므로 대용량에서 비효율
- nullable=False — visit_id, patient_id는 식별자라 null 불허
- visit_date는 StringType — 일단 문자열로 받고 Transform에서 to_date로 변환
- charge: LongType — 진료비 정수. 음수/null 노이즈 존재 가정
4.2 데이터프레임 생성
raw_df = (spark.read
.option("header", True)
.schema(schema)
.csv('/opt/data/hospital_raw.csv'))
print(f"[Extract] 원시 데이터 수 : {raw_df.count()}")
print(f"[Extract] 파티션 수 : {raw_df.rdd.getNumPartitions()}")
raw_df.show()
- option("header", True) — CSV 첫 줄을 헤더로 인식하고 스킵
- getNumPartitions() — 데이터프레임이 몇 개 파티션으로 나뉘어 있는지 확인. 분산 처리 단위
- show() — Action. 상위 20개 행 출력하며 실제 로드·실행 트리거
5. Transform — 정제·파생·윈도우
5.1 데이터 정제
clean_df = (raw_df
.filter(F.col('status') == '완료')
.filter(F.col('department').isNotNull())
.filter(F.col('charge') > 0)
)
- status == '완료' — 취소건 제외. 분석 대상은 실제 진료 발생 건만
- department isNotNull — 부서 결측 제거
- charge > 0 — 음수·0원 제거. null도 비교 결과가 null로 자동 제외됨
5.2 파생 컬럼 생성
transformed_df = (clean_df
.withColumn('visit_date', F.to_date('visit_date', 'yyyy-MM-dd'))
.withColumn('visit_year', F.year('visit_date'))
.withColumn('visit_month', F.month('visit_date'))
.withColumn('visit_quarter',
F.when(F.month('visit_date').between(1, 3), 'Q1')
.when(F.month('visit_date').between(4, 6), 'Q2')
.when(F.month('visit_date').between(7, 9), 'Q3')
.otherwise('Q4'))
.withColumn('charge_tier',
F.when(F.col('charge') < 50000, '외래')
.when(F.col('charge') < 200000, '일반')
.otherwise('고액'))
)
- to_date — 문자열을 date 타입으로 변환. 형식 불일치 시 null
- year / month — date 타입에서 연·월 추출하여 별도 컬럼으로 분리
- when().when().otherwise() — SQL CASE WHEN과 동일. 조건부 컬럼 생성
- between(1, 3) — 분기 산출용. 1월~3월 → Q1
- charge_tier — 진료비를 외래/일반/고액으로 구간화. 후속 분석 용이성 확보
| 컬럼 | 타입 | 용도 |
| visit_date | date | 시계열 분석 기준 |
| visit_year | int | 연간 집계 키 |
| visit_month | int | 월간 집계 키 |
| visit_quarter | string | 분기 집계 키 |
| charge_tier | string | 진료비 구간 분류 |
5.3 Window 함수
win_partition = Window.partitionBy('patient_id').orderBy('visit_date')
trans_win_df = (transformed_df
.withColumn('visit_seq', F.row_number().over(win_partition))
.withColumn('prev_department', F.lag('department', 1).over(win_partition))
.withColumn('cum_charge', F.sum('charge').over(win_partition))
)
- Window.partitionBy('patient_id').orderBy('visit_date') — 환자 단위로 그룹핑하고 방문일 순으로 정렬
- row_number() — 환자별 방문 회차(1, 2, 3, ...)
- lag('department', 1) — 이전 방문의 부서. 부서 이동 추적 가능
- sum('charge') — 환자별 진료비 누적합. 윈도우 범위 내 누적
Window 함수는 같은 파티션 내 행 간 관계를 표현할 때 강력함. groupBy는 행을 축소하지만 Window는 원본 행을 유지하면서 집계값을 부착함.
5.4 파티션 분산 확인
trans_win_df.withColumn("partition_id", F.spark_partition_id()) \
.groupBy("partition_id") \
.count() \
.orderBy("partition_id") \
.show()
- spark_partition_id() — 각 행이 속한 파티션 번호 반환
- 목적 — 데이터가 균등하게 분산됐는지 점검. 한 파티션에 몰려있으면 skew 발생
- Skew 처리 — 심한 경우 repartition() 또는 키 salting으로 재분배
6. Load — 집계 및 저장
6.1 부서별 월간 실적
department_monthly = (trans_win_df
.groupBy('department', 'visit_year', 'visit_month')
.agg(
F.count('department').alias('visit_count'),
F.sum('charge').alias('total_charge'),
F.avg('charge').alias('avg_charge'),
F.countDistinct('patient_id').alias('unique_patient')
)
.orderBy('total_charge', ascending=False)
)
- groupBy — 부서·연·월 단위로 집계 그룹 형성
- count / sum / avg — 방문 건수, 총 진료비, 평균 진료비
- countDistinct('patient_id') — 고유 환자 수. 같은 환자의 중복 방문 제거
- alias — 집계 결과 컬럼명 명시. 안 쓰면 count(department) 같은 SQL 함수명이 그대로 컬럼명이 됨
- orderBy(..., ascending=False) — 매출 큰 순으로 정렬
6.2 환자별 요약
patient_summary_df = (trans_win_df
.groupBy('patient_id')
.agg(
F.count('visit_id').alias('total_visit'),
F.sum('charge').alias('total_charge'),
F.min('visit_date').alias('first_visit'),
F.max('visit_date').alias('last_visit'),
F.collect_set('department').alias('visited_department')
)
)
- min / max 날짜 — 환자의 첫 방문일·마지막 방문일
- collect_set('department') — 중복 제거된 방문 부서 목록을 배열로 수집
- collect_list vs collect_set — list는 중복 유지, set은 중복 제거
6.3 Parquet 저장
department_monthly.write \
.partitionBy('visit_year', 'visit_month') \
.mode('overwrite') \
.parquet('/tmp/gold/output/dept_monthly')
patient_summary_df.write \
.mode('overwrite') \
.parquet('/tmp/gold/output/patient_summary')
- partitionBy('visit_year', 'visit_month') — Hive 스타일 디렉토리 파티셔닝. visit_year=2026/visit_month=5/... 구조 생성
- mode('overwrite') — 동일 경로 재실행 시 멱등성 보장
- parquet — 컬럼 기반 압축 포맷. 후속 조회 시 필요 컬럼만 스캔
- 파티셔닝 효과 — 후속 분석에서 특정 월만 조회 시 해당 디렉토리만 스캔 → I/O 절감
파티션 키는 자주 필터링되는 컬럼으로 잡아야 효과가 큼. 환자 요약은 단일 행이므로 파티셔닝 불필요.
7. 트러블슈팅 / 주의 사항
작성 중 또는 실행 시 마주칠 수 있는 이슈들.
7.1 orderBy(descend=True) 파라미터 오타
.orderBy('total_charge', descend=True) # ✗
.orderBy('total_charge', ascending=False) # ✓
- 원인 — PySpark의 정렬 파라미터는 ascending. descend는 존재하지 않음
- 결과 — TypeError: orderBy() got an unexpected keyword argument 'descend'
7.2 show()를 print() 내부에서 호출
print(f"... : {clean_df.show()}") # ✗
- 문제 — show()는 콘솔에 출력하고 None을 반환. f-string에서는 "None"만 표시됨
- 개선 — show()는 별도 호출, 메시지는 따로 출력
print("[Transform - 1] 데이터 정제 후")
clean_df.show()
7.3 Master 호스트명 인식 불가
java.net.UnknownHostException: spark-master
- 원인 — Docker Compose 네트워크 외부에서 실행하거나 컨테이너명 불일치
- 해결 — docker-compose.yml의 service 이름과 master(...)의 호스트명 일치 확인
- 로컬 단독 테스트 — master('local[*]')로 변경하여 클러스터 없이도 동작 검증 가능
7.4 작은 데이터에 과한 셔플 파티션
- 현상 — 천 건 데이터에 셔플 후 200개 파일이 생성됨
- 원인 — spark.sql.shuffle.partitions 기본값(200)
- 해결 — 데이터 규모에 맞춰 8~16으로 축소
8. EMR 환경과의 차이점
같은 ETL을 EMR로 옮길 때 바꿔야 할 부분.
| 항목 | Docker | EMR |
| master 설정 | spark://spark-master:7077 | spark-submit이 YARN으로 자동 처리(생략) |
| 입력 경로 | /opt/data/... | s3://bucket/raw/... |
| 출력 경로 | /tmp/gold/... | s3://bucket/processed/... |
| 실행 방법 | docker exec로 spark-submit | EMR Step + command-runner.jar |
| 자원 설정 | 컨테이너 한계 내 | 인스턴스 타입·개수 |
| 결과 영속성 | 컨테이너 볼륨 | S3(영속) |
master 설정과 경로만 환경 변수로 빼두면 동일 코드로 두 환경 모두 대응 가능. 인자 받는 구조(sys.argv)로 만들면 더 깔끔함.
9. 요약
| 단계 | 연산 | 핵심 포인트 |
| SparkSession | Master 연결 + 자원 설정 | spark.sql.shuffle.partitions 데이터 크기에 맞게 |
| Extract | CSV + 스키마 | 명시적 스키마로 추론 비용 절감 |
| Transform 1 | filter | status/department/charge 노이즈 제거 |
| Transform 2 | 파생 컬럼 | to_date, year/month, when 구간화 |
| Transform 3 | Window 함수 | row_number, lag, 누적합 |
| Load | groupBy + 집계 | countDistinct, collect_set |
| 저장 | Parquet + 파티셔닝 | 자주 필터링되는 컬럼을 파티션 키로 |
Docker 기반 Spark는 EMR과 동일한 분산 처리 개념을 무료로 학습할 수 있는 환경. 핵심 코드는 거의 동일하고 클러스터 진입점·경로만 다름. 운영으로 갈 때 환경 분리만 깔끔히 해두면 이식 비용이 매우 낮아짐.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > 데이터 처리' 카테고리의 다른 글
| 데이터 처리 6 - EMR + Spark (0) | 2026.05.19 |
|---|---|
| 데이터 처리 4 - Apache Spark (0) | 2026.05.18 |
| 데이터 처리 3 - polars (1) | 2026.05.14 |
| 데이터 처리 2 - pandas (0) | 2026.05.13 |
| 데이터 처리 1 - Numpy (0) | 2026.05.13 |