SK플래닛 ai활용 데이터엔지니어 과정 2기/데이터 처리

데이터 처리 5 - Docker Spark

dev-lee 2026. 5. 19. 16:17

1. 배경 — 왜 Docker로 Spark를 띄우는가

EMR을 띄우려면 비용·IAM·VPC 등 인프라 학습 부담이 큼. 학습 단계에서는 로컬에서 Spark 클러스터를 흉내 내는 환경이 필요함. Docker로 Spark Master + Worker 컨테이너를 띄우면 EMR과 동일한 분산처리 개념(파티션, executor, shuffle)을 무료로 체험 가능.

1.1 EMR vs Docker Spark 비교

항목  EMR  Docker Spark
환경 AWS 관리형 클러스터 로컬 컨테이너 클러스터
비용 인스턴스 사용 시간 과금 무료(로컬 리소스만 사용)
데이터 위치 S3 중심 로컬 볼륨(/opt/data)
클러스터 매니저 YARN Spark Standalone
Master 접속 EMR API/EC2 spark://spark-master:7077
용도 운영·대규모 처리 학습·개발·소규모 검증

동일한 PySpark 코드라도 master 설정과 데이터 경로만 바꿔주면 두 환경을 오갈 수 있음. 학습을 Docker에서 검증한 뒤 EMR로 이식하는 흐름이 자연스러움.


2. 전체 흐름

[로컬 컨테이너 클러스터]
spark-master  ──┐
spark-worker1 ──┼── spark://spark-master:7077
spark-worker2 ──┘
        ↑
[/opt/jobs/etl_job.py] ── spark-submit
        ↓
[Extract]  /opt/data/hospital_raw.csv (헤더 CSV)
[Transform] filter → 파생 컬럼 → Window 함수
[Load]     /tmp/gold/output/
           ├─ dept_monthly       (year/month 파티션 Parquet)
           └─ patient_summary    (단일 Parquet)
  • Master 컨테이너 — 클러스터 매니저. Driver의 Job을 받아 Executor에 분배
  • Worker 컨테이너 — 실제 연산 수행. Executor 프로세스 호스팅
  • 볼륨 마운트 — /opt/data(입력), /opt/jobs(스크립트), /tmp/gold(출력)
  • Standalone 모드 — YARN/Mesos 없이 Spark 자체 클러스터 매니저 사용

3. SparkSession 구성과 클러스터 연결

3.1 빌더 설정

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = (SparkSession.builder
    .appName('Hospital_ETL_Cluster')
    .master('spark://spark-master:7077')
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate())

spark.sparkContext.setLogLevel("WARN")
  • appName — Spark UI에 표시되는 작업 이름. 로그·모니터링 추적 단위
  • master('spark://spark-master:7077') — Docker 네트워크 내 Master 컨테이너 호스트명과 기본 포트. 로컬 단독 실행이면 local[*]
  • spark.executor.memory: 512m — Executor 당 메모리. 로컬 컨테이너 자원 한계 고려
  • spark.executor.cores: 1 — Executor 당 CPU 코어 수
  • spark.sql.shuffle.partitions: 8 — Shuffle(groupBy, join 등) 후 파티션 수. 기본값 200은 소규모 데이터에 과함
  • setLogLevel("WARN") — INFO 레벨 로그 억제. 결과 출력 가독성 향상

기본 shuffle.partitions=200은 BigData 전제. 로컬·소규모에서는 8~16 정도가 적절함. 너무 많으면 작은 파일이 양산되고 task 오버헤드가 커짐.

3.2 연결 정보 출력

print(f" Master   : {spark.sparkContext.master}")
print(f" App ID   : {spark.sparkContext.applicationId}")
print(f" App Name : {spark.sparkContext.appName}")
  • sparkContext.master — 실제 연결된 클러스터 매니저 주소
  • applicationId — Spark가 부여한 고유 작업 ID. Spark History Server에서 추적 키로 사용

4. Extract — CSV 데이터 로드

4.1 스키마 정의

schema = StructType([
    StructField("visit_id",   LongType(),   False),
    StructField("patient_id", StringType(), False),
    StructField("visit_date", StringType(), True),
    StructField("department", StringType(), True),
    StructField("diagnosis",  StringType(), True),
    StructField("charge",     LongType(),   True),
    StructField("status",     StringType(), True),
])
  • 명시적 스키마 — inferSchema=True는 전체 데이터를 1회 스캔하므로 대용량에서 비효율
  • nullable=False — visit_id, patient_id는 식별자라 null 불허
  • visit_date는 StringType — 일단 문자열로 받고 Transform에서 to_date로 변환
  • charge: LongType — 진료비 정수. 음수/null 노이즈 존재 가정

4.2 데이터프레임 생성

raw_df = (spark.read
    .option("header", True)
    .schema(schema)
    .csv('/opt/data/hospital_raw.csv'))

print(f"[Extract] 원시 데이터 수 : {raw_df.count()}")
print(f"[Extract] 파티션 수 : {raw_df.rdd.getNumPartitions()}")
raw_df.show()
  • option("header", True) — CSV 첫 줄을 헤더로 인식하고 스킵
  • getNumPartitions() — 데이터프레임이 몇 개 파티션으로 나뉘어 있는지 확인. 분산 처리 단위
  • show() — Action. 상위 20개 행 출력하며 실제 로드·실행 트리거

5. Transform — 정제·파생·윈도우

5.1 데이터 정제

clean_df = (raw_df
    .filter(F.col('status') == '완료')
    .filter(F.col('department').isNotNull())
    .filter(F.col('charge') > 0)
)
  • status == '완료' — 취소건 제외. 분석 대상은 실제 진료 발생 건만
  • department isNotNull — 부서 결측 제거
  • charge > 0 — 음수·0원 제거. null도 비교 결과가 null로 자동 제외됨

5.2 파생 컬럼 생성

transformed_df = (clean_df
    .withColumn('visit_date', F.to_date('visit_date', 'yyyy-MM-dd'))
    .withColumn('visit_year',  F.year('visit_date'))
    .withColumn('visit_month', F.month('visit_date'))
    .withColumn('visit_quarter',
        F.when(F.month('visit_date').between(1, 3), 'Q1')
         .when(F.month('visit_date').between(4, 6), 'Q2')
         .when(F.month('visit_date').between(7, 9), 'Q3')
         .otherwise('Q4'))
    .withColumn('charge_tier',
        F.when(F.col('charge') < 50000,  '외래')
         .when(F.col('charge') < 200000, '일반')
         .otherwise('고액'))
)
  • to_date — 문자열을 date 타입으로 변환. 형식 불일치 시 null
  • year / month — date 타입에서 연·월 추출하여 별도 컬럼으로 분리
  • when().when().otherwise() — SQL CASE WHEN과 동일. 조건부 컬럼 생성
  • between(1, 3) — 분기 산출용. 1월~3월 → Q1
  • charge_tier — 진료비를 외래/일반/고액으로 구간화. 후속 분석 용이성 확보
컬럼  타입  용도
visit_date date 시계열 분석 기준
visit_year int 연간 집계 키
visit_month int 월간 집계 키
visit_quarter string 분기 집계 키
charge_tier string 진료비 구간 분류

5.3 Window 함수

win_partition = Window.partitionBy('patient_id').orderBy('visit_date')

trans_win_df = (transformed_df
    .withColumn('visit_seq',       F.row_number().over(win_partition))
    .withColumn('prev_department', F.lag('department', 1).over(win_partition))
    .withColumn('cum_charge',      F.sum('charge').over(win_partition))
)
  • Window.partitionBy('patient_id').orderBy('visit_date') — 환자 단위로 그룹핑하고 방문일 순으로 정렬
  • row_number() — 환자별 방문 회차(1, 2, 3, ...)
  • lag('department', 1) — 이전 방문의 부서. 부서 이동 추적 가능
  • sum('charge') — 환자별 진료비 누적합. 윈도우 범위 내 누적

Window 함수는 같은 파티션 내 행 간 관계를 표현할 때 강력함. groupBy는 행을 축소하지만 Window는 원본 행을 유지하면서 집계값을 부착함.

5.4 파티션 분산 확인

trans_win_df.withColumn("partition_id", F.spark_partition_id()) \
    .groupBy("partition_id") \
    .count() \
    .orderBy("partition_id") \
    .show()
  • spark_partition_id() — 각 행이 속한 파티션 번호 반환
  • 목적 — 데이터가 균등하게 분산됐는지 점검. 한 파티션에 몰려있으면 skew 발생
  • Skew 처리 — 심한 경우 repartition() 또는 키 salting으로 재분배

6. Load — 집계 및 저장

6.1 부서별 월간 실적

department_monthly = (trans_win_df
    .groupBy('department', 'visit_year', 'visit_month')
    .agg(
        F.count('department').alias('visit_count'),
        F.sum('charge').alias('total_charge'),
        F.avg('charge').alias('avg_charge'),
        F.countDistinct('patient_id').alias('unique_patient')
    )
    .orderBy('total_charge', ascending=False)
)
  • groupBy — 부서·연·월 단위로 집계 그룹 형성
  • count / sum / avg — 방문 건수, 총 진료비, 평균 진료비
  • countDistinct('patient_id') — 고유 환자 수. 같은 환자의 중복 방문 제거
  • alias — 집계 결과 컬럼명 명시. 안 쓰면 count(department) 같은 SQL 함수명이 그대로 컬럼명이 됨
  • orderBy(..., ascending=False) — 매출 큰 순으로 정렬

6.2 환자별 요약

patient_summary_df = (trans_win_df
    .groupBy('patient_id')
    .agg(
        F.count('visit_id').alias('total_visit'),
        F.sum('charge').alias('total_charge'),
        F.min('visit_date').alias('first_visit'),
        F.max('visit_date').alias('last_visit'),
        F.collect_set('department').alias('visited_department')
    )
)
  • min / max 날짜 — 환자의 첫 방문일·마지막 방문일
  • collect_set('department') — 중복 제거된 방문 부서 목록을 배열로 수집
  • collect_list vs collect_set — list는 중복 유지, set은 중복 제거

6.3 Parquet 저장

department_monthly.write \
    .partitionBy('visit_year', 'visit_month') \
    .mode('overwrite') \
    .parquet('/tmp/gold/output/dept_monthly')

patient_summary_df.write \
    .mode('overwrite') \
    .parquet('/tmp/gold/output/patient_summary')
  • partitionBy('visit_year', 'visit_month') — Hive 스타일 디렉토리 파티셔닝. visit_year=2026/visit_month=5/... 구조 생성
  • mode('overwrite') — 동일 경로 재실행 시 멱등성 보장
  • parquet — 컬럼 기반 압축 포맷. 후속 조회 시 필요 컬럼만 스캔
  • 파티셔닝 효과 — 후속 분석에서 특정 월만 조회 시 해당 디렉토리만 스캔 → I/O 절감

파티션 키는 자주 필터링되는 컬럼으로 잡아야 효과가 큼. 환자 요약은 단일 행이므로 파티셔닝 불필요.


7. 트러블슈팅 / 주의 사항

작성 중 또는 실행 시 마주칠 수 있는 이슈들.

7.1 orderBy(descend=True) 파라미터 오타

.orderBy('total_charge', descend=True)  # ✗
.orderBy('total_charge', ascending=False)  # ✓
  • 원인 — PySpark의 정렬 파라미터는 ascending. descend는 존재하지 않음
  • 결과 — TypeError: orderBy() got an unexpected keyword argument 'descend'

7.2 show()를 print() 내부에서 호출

print(f"... : {clean_df.show()}")  # ✗
  • 문제 — show()는 콘솔에 출력하고 None을 반환. f-string에서는 "None"만 표시됨
  • 개선 — show()는 별도 호출, 메시지는 따로 출력
print("[Transform - 1] 데이터 정제 후")
clean_df.show()

7.3 Master 호스트명 인식 불가

java.net.UnknownHostException: spark-master
  • 원인 — Docker Compose 네트워크 외부에서 실행하거나 컨테이너명 불일치
  • 해결 — docker-compose.yml의 service 이름과 master(...)의 호스트명 일치 확인
  • 로컬 단독 테스트 — master('local[*]')로 변경하여 클러스터 없이도 동작 검증 가능

7.4 작은 데이터에 과한 셔플 파티션

  • 현상 — 천 건 데이터에 셔플 후 200개 파일이 생성됨
  • 원인 — spark.sql.shuffle.partitions 기본값(200)
  • 해결 — 데이터 규모에 맞춰 8~16으로 축소

8. EMR 환경과의 차이점

같은 ETL을 EMR로 옮길 때 바꿔야 할 부분.

항목 Docker  EMR
master 설정 spark://spark-master:7077 spark-submit이 YARN으로 자동 처리(생략)
입력 경로 /opt/data/... s3://bucket/raw/...
출력 경로 /tmp/gold/... s3://bucket/processed/...
실행 방법 docker exec로 spark-submit EMR Step + command-runner.jar
자원 설정 컨테이너 한계 내 인스턴스 타입·개수
결과 영속성 컨테이너 볼륨 S3(영속)

master 설정과 경로만 환경 변수로 빼두면 동일 코드로 두 환경 모두 대응 가능. 인자 받는 구조(sys.argv)로 만들면 더 깔끔함.


9. 요약

단계  연산  핵심 포인트
SparkSession Master 연결 + 자원 설정 spark.sql.shuffle.partitions 데이터 크기에 맞게
Extract CSV + 스키마 명시적 스키마로 추론 비용 절감
Transform 1 filter status/department/charge 노이즈 제거
Transform 2 파생 컬럼 to_date, year/month, when 구간화
Transform 3 Window 함수 row_number, lag, 누적합
Load groupBy + 집계 countDistinct, collect_set
저장 Parquet + 파티셔닝 자주 필터링되는 컬럼을 파티션 키로

Docker 기반 Spark는 EMR과 동일한 분산 처리 개념을 무료로 학습할 수 있는 환경. 핵심 코드는 거의 동일하고 클러스터 진입점·경로만 다름. 운영으로 갈 때 환경 분리만 깔끔히 해두면 이식 비용이 매우 낮아짐.