SK플래닛 ai활용 데이터엔지니어 과정 2기/데이터 처리

데이터 처리 4 - Apache Spark

dev-lee 2026. 5. 18. 17:46

1. 개요

대규모 데이터를 빠르고 안전하게 처리하기 위한 분산 컴퓨팅 엔진. ETL 작업의 표준 도구 중 하나임.

  • 정의 — 클러스터 환경에서 대용량 데이터를 인메모리 방식으로 병렬 처리하는 오픈소스 프레임워크
  • 등장 배경 — 빅데이터 처리를 위해 하둡(Hadoop), MapReduce 등이 출현했으나 디스크 I/O로 인한 속도 저하가 발생 → 이를 해결하기 위해 등장
  • 위치 — 데이터 엔지니어링에서 대용량 데이터를 다룰 때의 표준 스펙으로 자리잡음 (현재도 계속 변화 중)

1.1 특징

  • 등장 시점 — 2010년
  • 언어 — 내부적으로 Scala로 작동, Scala를 래핑하여 Python/Java/R API 제공
  • 성능 — 하둡 대비 빅데이터 처리 속도가 100배 이상 빠름

2. 아키텍처

Spark는 4가지 핵심 컴포넌트로 구성된 분산 처리 구조를 가짐.

컴포넌트  역할
Driver 전체 작업 조율, SparkSession 포함
Cluster Manager 리소스 할당 및 관리
Executor 실제 Task 실행, 파티션에 할당된 데이터를 보유 및 관리
Task 하나의 파티션에 대한 최소 연산 단위

Driver가 명령을 내리면 Cluster Manager가 리소스를 분배하고, 각 Executor가 자신에게 할당된 파티션을 병렬로 처리하는 구조임.


3. 핵심 데이터 구조

3.1 계층 구조

DataFrame / Dataset   <-  고수준 API (주로 사용)
       ↓
      RDD             <-  저수준 API (실제 내부 데이터 구조)
       ↓
JVM 메모리 / Executor
  • DataFrame 구성 — RDD + 스키마 + 최적화 설정으로 이루어짐
  • 사용자는 보통 DataFrame을 다루지만, 내부적으로는 RDD 단위로 분산 처리됨

3.2 RDD (Resilient Distributed Dataset)

저수준 API로 Spark의 가장 근본적인 데이터 구조임.

  • Resilient — 노드 장애 시 데이터 적재 히스토리(lineage)를 기반으로 자동 재계산
  • Distributed — 여러 노드에 파티션 단위로 분산 저장
  • Dataset — Immutable(불변) 데이터 컬렉션
# DataFrame 방식 - 간결, 자동 최적화, 열(컬럼) 기반
df.filter(F.col('age') > 30)

# RDD 방식 - 수동 최적화 필요, 행(ROW) 기반
df.rdd.filter(lambda x: x['age'] > 30)

3.3 DataFrame

  • 구성 — RDD + 스키마 + 최적화 설정
  • 특징 — SQL 구성 방식과 유사하게 데이터 조작 가능
  • 유사 도구 — Polars와 유사 (실제로는 Polars가 Spark를 차용)
# 스키마 출력
df.printSchema()
df.schema

4. Lazy Evaluation (지연 실행)

Spark는 즉시 데이터를 로드/실행하지 않고, 액션이 호출될 때까지 실행 계획(DAG)만 준비함.

지연(Transformation) 즉시(Action)
select() show()
filter() count()
groupby() collect()
join() write()
withColumn() toPandas()
# 플랜만 구성 -> 실행하지 않고 DAG만 준비
df2 = df.filter(F.col('age') > 30).groupBy('country').agg(F.count('*'))

# 실제 수행 (Action 호출 시점에 최적화하여 실행)
df2.show()

Transformation은 DAG를 쌓기만 하고, Action이 호출되는 순간 Spark가 전체 계획을 한 번에 최적화하여 실행함. 이것이 Spark의 성능 핵심임.


5. 파티션(Partition)

Spark 분산 처리의 기본 단위. 데이터를 여러 조각으로 나누어 각 노드에 병렬로 분산 배치 → Executor가 처리를 담당함.

# 현재 DF의 실제 파티션 수 조회
df.rdd.getNumPartitions()
  • spark.sql.shuffle.partitions — groupBy, join 등 셔플이 발생할 때 사용되는 파티션 수 (기본값 200)
  • 소규모 — 4 정도
  • 중규모(Docker 환경) — 8 정도
  • 대규모(EMR 등 클라우드) — 200 이상 (데이터 크기에 따라 조정)
# 파티션 재분배
df = df.repartition(4)   # 셔플 발생, 균등하게 재분배
df = df.coalesce(2)      # 셔플 없이 파티션 축소

셔플(Shuffle) — groupBy, join 같은 집계 연산 시 분산된 노드 간에 데이터가 네트워크를 통해 이동하는 과정. 비용이 크므로 최소화하는 것이 성능 최적화의 핵심임.


6. Spark 코드 구성

6.1 SparkSession 초기화

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('MySparkApp') \
    .config('spark.sql.shuffle.partitions', '4') \
    .getOrCreate()
  • builder — Spark 실행 환경 준비 시작. 내부적으로 SparkContext, SQLContext 등도 함께 생성됨
  • appName — Spark 애플리케이션 이름. 대시보드 등에서 식별용으로 사용 (예: daily_sales_etl처럼 작업 내용 기반으로 네이밍)
  • config — 셔플 파티션 개수 등 설정값 지정
  • getOrCreate — 기존 SparkSession이 있으면 재사용, 없으면 생성
# 설정값 확인
spark.conf.get('spark.sql.shuffle.partitions')

6.2 DataFrame 생성과 파티션 배치

data = [
    ("A", "bread", 10, 5000),
    ("A", "cake", 5, 12000),
    ("B", "bread", 7, 5000),
    # ...
]
columns = ['store_id', 'item_name', 'quantity', 'price']

df = spark.createDataFrame(data, columns)
df.show()
  • show() — 데이터 최대 20개를 가져와 메모리에 로드 → 메모리 점유율 상승
  • Spark는 즉시 연산하지 않고 Lazy 연산을 수행하므로, Action 호출 시점에 처리됨
# 파티션별 데이터 배치 상황 확인
df.rdd.glom().collect()
# 데이터는 2개 파티션에 단순 2등분되어 배치됨 (순서대로)

Pandas/Polars는 하나의 메모리 공간에서 하나의 CPU로 처리되지만, Spark는 데이터를 조각으로 나눠 조각별로 병렬 처리함. 데이터가 크면 여러 노드에서 동시 처리한 뒤 결과를 취합하는 구조임.


7. ETL 실습 — 의료 데이터 파이프라인

7.1 시나리오

병원에서 매일 저장되는 진료 기록 원시 데이터를 추출(Extract) → 정제 및 가공(Transform) → 분석용 데이터로 저장(Load)하는 파이프라인 구성.

  • 환자 이력 관리
  • 진료과별 집계
  • 월별 매출 분석 등

7.2 Extract — 원시 데이터 준비

실제로는 S3, DB 등에서 가져오지만 여기서는 더미 데이터로 진행. 결측치, 취소 건, 음수 금액 등 노이즈를 일부러 포함시킴.

from pyspark.sql.types import *

# 스키마 명시 (타입 추론 생략 → 처리 속도 향상)
schema = StructType([
    StructField("visit_id", LongType(), False),
    StructField("patient_id", StringType(), False),
    StructField("visit_date", StringType(), True),
    StructField("department", StringType(), True),
    StructField("diagnosis", StringType(), True),
    StructField("charge", LongType(), True),
    StructField("status", StringType(), True)
])

raw_df = spark.createDataFrame(raw_data, schema)
  • 스키마 명시 — 사전에 설계된 상태로 데이터프레임을 구성하므로 상대적으로 빠른 처리 가능
  • 스키마 미명시 — 타입 추론을 위해 전체 데이터를 스캔해야 함 → 데이터가 크면 지연 발생

데이터를 읽으면 Driver가 직렬화하여 Cluster Manager에 전달하고, Manager가 파티션을 Executor에 배분하는 구조임.

7.3 Transform — ① 데이터 정제

from pyspark.sql import functions as F

clean_df = (raw_df
    .filter(F.col('status') == '완료')
    .filter(F.col('department').isNotNull())
    .filter(F.col('charge') > 0)
)
  • F.col() — 컬럼을 지칭하는 함수 (Polars의 pl.col()과 동일 개념)
  • 취소 건, 부서 결측, 음수 금액 등 3가지 케이스를 제거

7.4 Transform — ② 타입 변환과 파생 컬럼

transformed_df = (clean_df
    .withColumn('visit_date', F.to_date('visit_date', 'yyyy-MM-dd'))
    .withColumn('visit_year', F.year('visit_date'))
    .withColumn('visit_month', F.month('visit_date'))
    .withColumn('visit_quarter',
        F.when(F.month('visit_date').between(1, 3), 'Q1')
         .when(F.month('visit_date').between(4, 6), 'Q2')
         .when(F.month('visit_date').between(7, 9), 'Q3')
         .otherwise('Q4'))
    .withColumn('charge_tier',
        F.when(F.col('charge') < 50000, '외래')
         .when(F.col('charge') < 200000, '일반')
         .otherwise('고액'))
)
  • to_date — 문자열을 날짜 타입으로 변환
  • year, month — 날짜에서 연/월 추출
  • when().when().otherwise() — SQL의 CASE WHEN과 동일한 조건 분기 구문

7.5 Transform — ③ 윈도우 함수

환자별 누적 통계를 계산하기 위해 윈도우 함수 활용.

분류  함수  동작
순위 row_number() 1, 2, 3... (동점 무관)
순위 rank() 1, 1, 3... (동점이면 같은 순위, 다음 순위 건너뜀)
순위 dense_rank() 1, 1, 2... (동점이면 같은 순위, 다음 순위 건너뛰지 않음)
이동 lag() 이전 행 값 (예: 이전 방문 진료비)
이동 lead() 다음 행 값
누적 sum().over() 누적합
누적 avg().over() 누적 평균
누적 max().over() 누적 최대
from pyspark.sql.window import Window

# 1. 윈도우 기준점 정의 (파티션 + 정렬)
win_partition = Window.partitionBy('patient_id').orderBy('visit_date')

# 2. over() 함수로 파생 컬럼 생성
trans_win_df = (transformed_df
    .withColumn('visit_seq', F.row_number().over(win_partition))
    .withColumn('prev_department', F.lag('department', 1).over(win_partition))
    .withColumn('cum_charge', F.sum('charge').over(win_partition))
)
  • partitionBy — 환자 기준으로 그룹화
  • orderBy — 방문일 기준 정렬
  • over() — 윈도우 함수를 적용하는 기준 지정

7.6 Load — 분석용 테이블 생성

부서별 월간 실적 집계

department_monthly = (trans_win_df
    .groupBy('department', 'visit_year', 'visit_month')
    .agg(
        F.count('department').alias('visit_count'),
        F.sum('charge').alias('total_charge'),
        F.avg('charge').alias('avg_charge'),
        F.countDistinct('patient_id').alias('unique_patient')
    )
    .orderBy('total_charge', descending=True)
)

환자별 요약 테이블

patient_summary_df = (trans_win_df
    .groupBy('patient_id')
    .agg(
        F.count('visit_id').alias('total_visit'),
        F.sum('charge').alias('total_charge'),
        F.min('visit_date').alias('first_visit'),
        F.max('visit_date').alias('last_visit'),
        F.collect_set('department').alias('visited_department')
    )
)
  • alias — 집계 결과 컬럼명 지정
  • countDistinct — 고유값 개수 카운트
  • collect_set — 중복 제거하여 리스트로 수집 (집합)
  • collect_list — 중복 포함하여 리스트로 수집

7.7 저장 — Parquet 포맷

department_monthly.write \
    .partitionBy('visit_year', 'visit_month') \
    .mode('overwrite') \
    .parquet('/tmp/gold/output/dept_monthly')
  • partitionBy — 디렉토리 분할 정책 설정 (저장 시 파티션 키)
  • mode — 저장 모드 지정
    • overwrite — 덮어쓰기
    • append — 추가
    • ignore — 데이터 있으면 무시
    • error — 데이터 있으면 에러
  • parquet — 컬럼 기반 압축 포맷으로 저장 (분석 최적화)
# 저장된 데이터 재로딩
loaded_df = spark.read.parquet('/tmp/gold/output/dept_monthly')

진료과는 데이터가 적어 파티션 키로 추가하지 않았지만, 데이터 규모가 커지면 추가해도 무방함.


8. 요약

개념 핵심
Spark 대용량 데이터를 인메모리 방식으로 병렬 처리하는 분산 컴퓨팅 엔진
아키텍처 Driver(조율) → Cluster Manager(분배) → Executor(실행) → Task(연산)
RDD 저수준 분산 데이터 구조. 장애 복구, 불변성, 분산 저장이 특징
DataFrame RDD + 스키마 + 최적화. 고수준 API로 SQL 유사 조작 가능
Lazy Evaluation Transformation은 DAG만 쌓고, Action 호출 시 한 번에 최적화하여 실행
Partition 분산 처리의 기본 단위. 셔플 시 네트워크 비용 발생 → 최소화 중요
ETL 흐름 Extract → Transform(filter/withColumn/window) → Load(Parquet)

 

Spark는 단일 머신의 메모리 한계를 넘는 데이터를 다룰 때 진가를 발휘함. Lazy Evaluation과 파티션 기반 분산 처리를 이해하는 것이 성능 최적화의 출발점임.