S3 데이터 레이크에 쌓인 데이터를 표준 SQL로 분석하는 서버리스 쿼리 서비스. Raw 데이터 전처리부터 이상치 탐지, 결측치 처리, CTAS 기반 ETL까지 수행 가능
1. Amazon Athena 개요
S3에 저장된 데이터를 표준 SQL로 직접 분석할 수 있는 서버리스 대화형 쿼리 서비스.
데이터 파이프라인에서 중간~후반부에 위치하며, Raw 데이터를 가공/전처리하는 역할을 담당함.
핵심 특징
- Presto/Trino 기반 — Meta에서 대규모 데이터 분석을 위해 만든 Presto를 기반으로 구축. 상표권 문제로 Trino가 분리되었으나 태생은 동일. Athena는 표준 SQL + Trino 문법 기반
- JSON 처리 강력 — json_extract(), json_extract_scalar() 등 JSON 관련 함수가 풍부하게 지원됨
- 식별자 규칙 — 식별자는 ""(큰따옴표), 문자열은 ''(작은따옴표) 사용
- 타입 체크 엄격 — 암묵적 형변환이 제한적이므로 명시적 캐스팅 필요
장단점
항목 내용
| 장점 | 대용량 데이터 조회/분석에 최적화, 비용이 상대적으로 저렴 |
| 단점 | 처리 시간 편차가 큼 → 정해진 시간에 처리되어야 하는 작업에는 리스크 |
| 적합한 작업 | 시간 여유가 있는 배치 작업, Airflow 스케줄 기반 분석 |
| 대안 (빠른 처리) | RedShift, OpenSearch |
사전 준비
- S3에 쿼리 결과를 저장할 폴더 지정 → s3://버킷명/athena/
- 원본 데이터가 위치할 폴더 → s3://버킷명/csvs/ 등
2. Athena 기본 실습
2.1 테이블 생성 — JSON 데이터 (a.txt)
Athena 콘솔 → 생성 → S3 버킷 데이터로 테이블 만들기 → 데이터 형식 JSON 선택
event_id (string)
user_id (string)
event_type (string)
event_time (timestamp)
attributes (struct) ← JSON 중첩 구조
2.2 Struct 타입 접근 — . 연산자
attributes 컬럼이 Struct 타입으로 지정된 경우, . 연산자로 내부 필드에 접근 가능.
SELECT
DET.user_id,
DET.attributes.item_id AS item,
DET.attributes.price
FROM dummy_test_tbl AS DET
WHERE event_type = 'purchase';
2.3 문자열 JSON 접근 — json_extract_scalar()
JSON이 Struct가 아닌 문자열로 저장된 경우, 역직렬화하여 접근.
SELECT
user_id,
json_extract_scalar(attributes, '$.item_id') AS item_id
FROM dummy_test_tbl;
3. CTAS — 최적화된 테이블 생성 (b.csv)
원본 데이터 → 테이블 구성 → CTAS로 필요한 데이터를 특정 포맷으로 변환 → 이후 데이터 조회 시 빠르게 획득 가능.
CREATE TABLE optimized_sales_tbl
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://버킷명/data/ctas_parquet/',
partitioned_by = ARRAY['year', 'month']
)
AS
SELECT
order_id,
customer_id,
product_id,
quantity,
amount,
CAST(year(order_date) AS VARCHAR) AS year,
LPAD(CAST(month(order_date) AS VARCHAR), 2, '0') AS month
FROM ctas_tbl
WHERE order_id IS NOT NULL;
- PARQUET — 열 기반 포맷으로 검색 속도 향상
- SNAPPY 압축 — 저장 공간 효율 향상
- partitioned_by — year/month 파티션 배치로 검색 속도 향상
- LPAD — month가 1,2,3으로 저장되므로 01,02,03 형태로 맞춤
4. 이상치 탐지 (c.csv)
특정 사용자가 평소보다 큰 금액을 결제한 이상 거래를 탐지하는 시나리오.
테이블 생성
CREATE EXTERNAL TABLE IF NOT EXISTS sales_tbl (
user_id string,
order_date date,
amount int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://버킷명/data/anomaly'
TBLPROPERTIES ("skip.header.line.count" = "1");

LAG 윈도우 함수로 직전 거래 가져오기
SELECT
user_id,
amount,
LAG(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS prev_amount
FROM sales_tbl
ORDER BY user_id;
- LAG(amount) — 사용자별 파티션 내에서 직전 행의 amount 값을 가져옴
- PARTITION BY user_id — 사용자별로 독립적인 윈도우 구성
- ORDER BY order_date — 거래일 기준 정렬

이상 거래 추출 — 직전 거래 대비 차액 50,000 이상
SELECT *
FROM (
SELECT
user_id,
amount,
LAG(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS prev_amount
FROM sales_tbl
ORDER BY user_id
)
WHERE amount - prev_amount >= 50000;
- 데이터가 지속적으로 쌓이는 환경에서 주기적으로 상태 변화를 체크하여 이상 징후 탐지 → 조치(통보, 중단, 지연 처리 등)
- 활용 예시 — 세션 유지: 이전 이벤트와 현재 이벤트의 시간 차를 활용하여 사용자가 서비스에 머무는 시간을 관측, 너무 오래 자리를 비우면 로그아웃 처리

5. 결측치 처리 (d.csv)
테이블 생성
CREATE EXTERNAL TABLE IF NOT EXISTS product_tbl (
product_name string,
category string,
quantity int,
original_price int,
discount_price int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://버킷명/data/missing/'
TBLPROPERTIES ("skip.header.line.count" = "1");

결측치 기본값 할당 — COALESCE + NULLIF
SELECT
product_name,
COALESCE(NULLIF(category, ''), 'Uncategoried') AS category_clean,
COALESCE(NULLIF(quantity, ''), '0') AS quantity_clean,
COALESCE(
NULLIF(discount_price, ''),
NULLIF(original_price, ''),
'0'
) AS final_price_clean
FROM product_tbl;
- NULLIF(값, '') — 빈 문자열을 NULL로 변환
- COALESCE — 첫 번째로 NULL이 아닌 값을 반환. 우선순위 적용 가능 (할인가 → 정가 → 0)

결측치 처리를 pandas/polars/spark에서 할지, Flink/Athena 등 중간 과정에서 할지에 대한 고민이 필요함. 벤치마킹하여 비용/유지보수를 고려하여 결정하면 좋음
타입 변환 — try_cast
SELECT
product_name,
COALESCE(try_cast(NULLIF(quantity, '') AS int), 0) AS quantity_clean_int
FROM product_tbl;
- try_cast — 변환 실패 시 에러 대신 NULL 반환. CAST보다 안전
파생 변수 추가 — CASE WHEN
수량 기준으로 거래 등급을 분류하는 파생 변수 생성.
SELECT
product_name,
COALESCE(try_cast(NULLIF(quantity, '') AS int), 0) AS quantity_clean_int,
CASE
WHEN try_cast(NULLIF(quantity, '') AS int) >= 10 THEN 'High Trade'
WHEN try_cast(NULLIF(quantity, '') AS int) > 0 THEN 'Low Trade'
ELSE 'Out of Trade'
END AS trade_status
FROM product_tbl;
6. Athena + Airflow 연동
S3에 모인 데이터를 Athena SQL로 분석하고, Airflow DAG로 자동화하는 ETL 구조.
ETL 흐름
Extract (S3 원본 데이터)
→ Transform (Athena 쿼리로 분석/가공)
→ Load (결과를 S3에 CTAS로 저장)
DAG 구성 (3개)
| DAG | 파일 역할 |
| 10_aws_athena_ctas_etl.py | 90점 이상 학생 필터링 → Parquet 저장 + Sensor로 완료 감지 |
| 10_aws_athena_query.py | CREATE EXTERNAL TABLE로 소스 테이블 직접 생성 → result 기준 집계 리포트 |
| 10_aws_athena_sensor.py | AthenaSensor 활용 |
Raw Data 전처리 방법들
| 방법 | 조합 |
| 1 | Airflow + Athena |
| 2 | Airflow + ELK / OpenSearch |
| 3 | Airflow + EMR / Spark |
요약
| 개념 | 핵심 |
| Athena | S3 데이터를 SQL로 분석하는 서버리스 서비스 |
| Presto/Trino | Athena의 엔진 기반. 표준 SQL + Trino 문법 |
| CTAS | SELECT 결과를 Parquet 등 최적화 포맷으로 새 테이블 생성 |
| LAG 윈도우 함수 | 직전 행 값 참조. 이상치 탐지에 활용 |
| COALESCE + NULLIF | 결측치를 기본값으로 대체하는 패턴 |
| try_cast | 안전한 타입 변환. 실패 시 NULL 반환 |
| CASE WHEN | 조건 기반 파생 변수 생성 |
S3 데이터 레이크에 데이터가 모이면 Athena로 SQL 분석을 수행하여 원하는 형태의 데이터를 추출하고, Airflow로 이 과정을 자동화하는 구조
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Medallion Architecture - 2 — Bronze 코드 구현 (0) | 2026.04.21 |
|---|---|
| Medallion Architecture - 1 — Bronze (0) | 2026.04.21 |
| Athena - 2 (Athena 기반 일일 리포트 생성 DAG) (0) | 2026.04.20 |
| Athena - 1 (Ahtena + Airflow) (0) | 2026.04.17 |
| Kinesis - 2(KDS+Flink+Firehose+S3) (0) | 2026.04.16 |