1. 개요
대규모 데이터를 빠르고 안전하게 처리하기 위한 분산 컴퓨팅 엔진. ETL 작업의 표준 도구 중 하나임.
- 정의 — 클러스터 환경에서 대용량 데이터를 인메모리 방식으로 병렬 처리하는 오픈소스 프레임워크
- 등장 배경 — 빅데이터 처리를 위해 하둡(Hadoop), MapReduce 등이 출현했으나 디스크 I/O로 인한 속도 저하가 발생 → 이를 해결하기 위해 등장
- 위치 — 데이터 엔지니어링에서 대용량 데이터를 다룰 때의 표준 스펙으로 자리잡음 (현재도 계속 변화 중)
1.1 특징
- 등장 시점 — 2010년
- 언어 — 내부적으로 Scala로 작동, Scala를 래핑하여 Python/Java/R API 제공
- 성능 — 하둡 대비 빅데이터 처리 속도가 100배 이상 빠름
2. 아키텍처
Spark는 4가지 핵심 컴포넌트로 구성된 분산 처리 구조를 가짐.
| 컴포넌트 | 역할 |
| Driver | 전체 작업 조율, SparkSession 포함 |
| Cluster Manager | 리소스 할당 및 관리 |
| Executor | 실제 Task 실행, 파티션에 할당된 데이터를 보유 및 관리 |
| Task | 하나의 파티션에 대한 최소 연산 단위 |
Driver가 명령을 내리면 Cluster Manager가 리소스를 분배하고, 각 Executor가 자신에게 할당된 파티션을 병렬로 처리하는 구조임.
3. 핵심 데이터 구조
3.1 계층 구조
DataFrame / Dataset <- 고수준 API (주로 사용)
↓
RDD <- 저수준 API (실제 내부 데이터 구조)
↓
JVM 메모리 / Executor
- DataFrame 구성 — RDD + 스키마 + 최적화 설정으로 이루어짐
- 사용자는 보통 DataFrame을 다루지만, 내부적으로는 RDD 단위로 분산 처리됨
3.2 RDD (Resilient Distributed Dataset)
저수준 API로 Spark의 가장 근본적인 데이터 구조임.
- Resilient — 노드 장애 시 데이터 적재 히스토리(lineage)를 기반으로 자동 재계산
- Distributed — 여러 노드에 파티션 단위로 분산 저장
- Dataset — Immutable(불변) 데이터 컬렉션
# DataFrame 방식 - 간결, 자동 최적화, 열(컬럼) 기반
df.filter(F.col('age') > 30)
# RDD 방식 - 수동 최적화 필요, 행(ROW) 기반
df.rdd.filter(lambda x: x['age'] > 30)
3.3 DataFrame
- 구성 — RDD + 스키마 + 최적화 설정
- 특징 — SQL 구성 방식과 유사하게 데이터 조작 가능
- 유사 도구 — Polars와 유사 (실제로는 Polars가 Spark를 차용)
# 스키마 출력
df.printSchema()
df.schema
4. Lazy Evaluation (지연 실행)
Spark는 즉시 데이터를 로드/실행하지 않고, 액션이 호출될 때까지 실행 계획(DAG)만 준비함.
| 지연(Transformation) | 즉시(Action) |
| select() | show() |
| filter() | count() |
| groupby() | collect() |
| join() | write() |
| withColumn() | toPandas() |
# 플랜만 구성 -> 실행하지 않고 DAG만 준비
df2 = df.filter(F.col('age') > 30).groupBy('country').agg(F.count('*'))
# 실제 수행 (Action 호출 시점에 최적화하여 실행)
df2.show()
Transformation은 DAG를 쌓기만 하고, Action이 호출되는 순간 Spark가 전체 계획을 한 번에 최적화하여 실행함. 이것이 Spark의 성능 핵심임.
5. 파티션(Partition)
Spark 분산 처리의 기본 단위. 데이터를 여러 조각으로 나누어 각 노드에 병렬로 분산 배치 → Executor가 처리를 담당함.
# 현재 DF의 실제 파티션 수 조회
df.rdd.getNumPartitions()
- spark.sql.shuffle.partitions — groupBy, join 등 셔플이 발생할 때 사용되는 파티션 수 (기본값 200)
- 소규모 — 4 정도
- 중규모(Docker 환경) — 8 정도
- 대규모(EMR 등 클라우드) — 200 이상 (데이터 크기에 따라 조정)
# 파티션 재분배
df = df.repartition(4) # 셔플 발생, 균등하게 재분배
df = df.coalesce(2) # 셔플 없이 파티션 축소
셔플(Shuffle) — groupBy, join 같은 집계 연산 시 분산된 노드 간에 데이터가 네트워크를 통해 이동하는 과정. 비용이 크므로 최소화하는 것이 성능 최적화의 핵심임.
6. Spark 코드 구성
6.1 SparkSession 초기화
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('MySparkApp') \
.config('spark.sql.shuffle.partitions', '4') \
.getOrCreate()
- builder — Spark 실행 환경 준비 시작. 내부적으로 SparkContext, SQLContext 등도 함께 생성됨
- appName — Spark 애플리케이션 이름. 대시보드 등에서 식별용으로 사용 (예: daily_sales_etl처럼 작업 내용 기반으로 네이밍)
- config — 셔플 파티션 개수 등 설정값 지정
- getOrCreate — 기존 SparkSession이 있으면 재사용, 없으면 생성
# 설정값 확인
spark.conf.get('spark.sql.shuffle.partitions')
6.2 DataFrame 생성과 파티션 배치
data = [
("A", "bread", 10, 5000),
("A", "cake", 5, 12000),
("B", "bread", 7, 5000),
# ...
]
columns = ['store_id', 'item_name', 'quantity', 'price']
df = spark.createDataFrame(data, columns)
df.show()
- show() — 데이터 최대 20개를 가져와 메모리에 로드 → 메모리 점유율 상승
- Spark는 즉시 연산하지 않고 Lazy 연산을 수행하므로, Action 호출 시점에 처리됨
# 파티션별 데이터 배치 상황 확인
df.rdd.glom().collect()
# 데이터는 2개 파티션에 단순 2등분되어 배치됨 (순서대로)
Pandas/Polars는 하나의 메모리 공간에서 하나의 CPU로 처리되지만, Spark는 데이터를 조각으로 나눠 조각별로 병렬 처리함. 데이터가 크면 여러 노드에서 동시 처리한 뒤 결과를 취합하는 구조임.
7. ETL 실습 — 의료 데이터 파이프라인
7.1 시나리오
병원에서 매일 저장되는 진료 기록 원시 데이터를 추출(Extract) → 정제 및 가공(Transform) → 분석용 데이터로 저장(Load)하는 파이프라인 구성.
- 환자 이력 관리
- 진료과별 집계
- 월별 매출 분석 등
7.2 Extract — 원시 데이터 준비
실제로는 S3, DB 등에서 가져오지만 여기서는 더미 데이터로 진행. 결측치, 취소 건, 음수 금액 등 노이즈를 일부러 포함시킴.
from pyspark.sql.types import *
# 스키마 명시 (타입 추론 생략 → 처리 속도 향상)
schema = StructType([
StructField("visit_id", LongType(), False),
StructField("patient_id", StringType(), False),
StructField("visit_date", StringType(), True),
StructField("department", StringType(), True),
StructField("diagnosis", StringType(), True),
StructField("charge", LongType(), True),
StructField("status", StringType(), True)
])
raw_df = spark.createDataFrame(raw_data, schema)
- 스키마 명시 — 사전에 설계된 상태로 데이터프레임을 구성하므로 상대적으로 빠른 처리 가능
- 스키마 미명시 — 타입 추론을 위해 전체 데이터를 스캔해야 함 → 데이터가 크면 지연 발생
데이터를 읽으면 Driver가 직렬화하여 Cluster Manager에 전달하고, Manager가 파티션을 Executor에 배분하는 구조임.
7.3 Transform — ① 데이터 정제
from pyspark.sql import functions as F
clean_df = (raw_df
.filter(F.col('status') == '완료')
.filter(F.col('department').isNotNull())
.filter(F.col('charge') > 0)
)
- F.col() — 컬럼을 지칭하는 함수 (Polars의 pl.col()과 동일 개념)
- 취소 건, 부서 결측, 음수 금액 등 3가지 케이스를 제거
7.4 Transform — ② 타입 변환과 파생 컬럼
transformed_df = (clean_df
.withColumn('visit_date', F.to_date('visit_date', 'yyyy-MM-dd'))
.withColumn('visit_year', F.year('visit_date'))
.withColumn('visit_month', F.month('visit_date'))
.withColumn('visit_quarter',
F.when(F.month('visit_date').between(1, 3), 'Q1')
.when(F.month('visit_date').between(4, 6), 'Q2')
.when(F.month('visit_date').between(7, 9), 'Q3')
.otherwise('Q4'))
.withColumn('charge_tier',
F.when(F.col('charge') < 50000, '외래')
.when(F.col('charge') < 200000, '일반')
.otherwise('고액'))
)
- to_date — 문자열을 날짜 타입으로 변환
- year, month — 날짜에서 연/월 추출
- when().when().otherwise() — SQL의 CASE WHEN과 동일한 조건 분기 구문
7.5 Transform — ③ 윈도우 함수
환자별 누적 통계를 계산하기 위해 윈도우 함수 활용.
| 분류 | 함수 | 동작 |
| 순위 | row_number() | 1, 2, 3... (동점 무관) |
| 순위 | rank() | 1, 1, 3... (동점이면 같은 순위, 다음 순위 건너뜀) |
| 순위 | dense_rank() | 1, 1, 2... (동점이면 같은 순위, 다음 순위 건너뛰지 않음) |
| 이동 | lag() | 이전 행 값 (예: 이전 방문 진료비) |
| 이동 | lead() | 다음 행 값 |
| 누적 | sum().over() | 누적합 |
| 누적 | avg().over() | 누적 평균 |
| 누적 | max().over() | 누적 최대 |
from pyspark.sql.window import Window
# 1. 윈도우 기준점 정의 (파티션 + 정렬)
win_partition = Window.partitionBy('patient_id').orderBy('visit_date')
# 2. over() 함수로 파생 컬럼 생성
trans_win_df = (transformed_df
.withColumn('visit_seq', F.row_number().over(win_partition))
.withColumn('prev_department', F.lag('department', 1).over(win_partition))
.withColumn('cum_charge', F.sum('charge').over(win_partition))
)
- partitionBy — 환자 기준으로 그룹화
- orderBy — 방문일 기준 정렬
- over() — 윈도우 함수를 적용하는 기준 지정
7.6 Load — 분석용 테이블 생성
부서별 월간 실적 집계
department_monthly = (trans_win_df
.groupBy('department', 'visit_year', 'visit_month')
.agg(
F.count('department').alias('visit_count'),
F.sum('charge').alias('total_charge'),
F.avg('charge').alias('avg_charge'),
F.countDistinct('patient_id').alias('unique_patient')
)
.orderBy('total_charge', descending=True)
)
환자별 요약 테이블
patient_summary_df = (trans_win_df
.groupBy('patient_id')
.agg(
F.count('visit_id').alias('total_visit'),
F.sum('charge').alias('total_charge'),
F.min('visit_date').alias('first_visit'),
F.max('visit_date').alias('last_visit'),
F.collect_set('department').alias('visited_department')
)
)
- alias — 집계 결과 컬럼명 지정
- countDistinct — 고유값 개수 카운트
- collect_set — 중복 제거하여 리스트로 수집 (집합)
- collect_list — 중복 포함하여 리스트로 수집
7.7 저장 — Parquet 포맷
department_monthly.write \
.partitionBy('visit_year', 'visit_month') \
.mode('overwrite') \
.parquet('/tmp/gold/output/dept_monthly')
- partitionBy — 디렉토리 분할 정책 설정 (저장 시 파티션 키)
- mode — 저장 모드 지정
- overwrite — 덮어쓰기
- append — 추가
- ignore — 데이터 있으면 무시
- error — 데이터 있으면 에러
- parquet — 컬럼 기반 압축 포맷으로 저장 (분석 최적화)
# 저장된 데이터 재로딩
loaded_df = spark.read.parquet('/tmp/gold/output/dept_monthly')
진료과는 데이터가 적어 파티션 키로 추가하지 않았지만, 데이터 규모가 커지면 추가해도 무방함.
8. 요약
| 개념 | 핵심 |
| Spark | 대용량 데이터를 인메모리 방식으로 병렬 처리하는 분산 컴퓨팅 엔진 |
| 아키텍처 | Driver(조율) → Cluster Manager(분배) → Executor(실행) → Task(연산) |
| RDD | 저수준 분산 데이터 구조. 장애 복구, 불변성, 분산 저장이 특징 |
| DataFrame | RDD + 스키마 + 최적화. 고수준 API로 SQL 유사 조작 가능 |
| Lazy Evaluation | Transformation은 DAG만 쌓고, Action 호출 시 한 번에 최적화하여 실행 |
| Partition | 분산 처리의 기본 단위. 셔플 시 네트워크 비용 발생 → 최소화 중요 |
| ETL 흐름 | Extract → Transform(filter/withColumn/window) → Load(Parquet) |
Spark는 단일 머신의 메모리 한계를 넘는 데이터를 다룰 때 진가를 발휘함. Lazy Evaluation과 파티션 기반 분산 처리를 이해하는 것이 성능 최적화의 출발점임.
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > 데이터 처리' 카테고리의 다른 글
| 데이터 처리 6 - EMR + Spark (0) | 2026.05.19 |
|---|---|
| 데이터 처리 5 - Docker Spark (0) | 2026.05.19 |
| 데이터 처리 3 - polars (1) | 2026.05.14 |
| 데이터 처리 2 - pandas (0) | 2026.05.13 |
| 데이터 처리 1 - Numpy (0) | 2026.05.13 |