실시간 주가 집계 파이프라인 구축
프로듀서가 주가 로그를 KDS로 전송하면, Flink가 10초 단위로 종목별 평균가를 집계하여 출력 KDS로 보내고, Firehose가 이를 받아 S3에 적재하는 실시간 스트림 처리 파이프라인 구조
전체 아키텍처
kds_flink_adf_producer.py (로컬 프로듀서)
↓ put_record (1초 간격)
KDS Input (de-ai-04-an2-kds-stock-input)
↓ Flink가 LATEST 포지션부터 구독
Flink Application (de-ai-04-an2-flink-stock)
├─ stock_input 테이블 (KDS 입력 매핑)
├─ TUMBLE 윈도우 연산 (10초 단위 ticker별 AVG)
└─ stock_output 테이블 (KDS 출력 매핑)
↓ INSERT INTO 결과 전송
KDS Output (de-ai-04-an2-kds-stock-output)
↓ Firehose 구독
Firehose (de-ai-04-an2-adf-link-stock)
↓ 버퍼링 후 배치 전송
S3 (가공된 데이터 저장소)
Raw 주가 데이터를 Flink로 전처리(10초 평균) 하여 S3에 적재하는 구조. KDS가 앞뒤로 두 번 등장하는 이유는 Flink의 입력용 스트림과 출력용 스트림이 분리되어 있기 때문임.
Part 1. kds_flink_adf_producer.py — 프로듀서
1. 모듈 임포트 및 환경변수
import time
import random
import json
from datetime import datetime
import boto3
from dotenv import load_dotenv
import os
load_dotenv()
ACCESS_KEY = os.getenv('ACCESS_KEY')
SECRET_KEY = os.getenv('SECRET_KEY')
REGION = 'ap-northeast-2'
- boto3 — AWS SDK for Python. Kinesis 클라이언트 생성에 사용
- dotenv — .env 파일에서 AWS 자격증명 로드. 키 하드코딩 방지 목적
- REGION — ap-northeast-2 서울 리전 고정
2. 클라이언트 생성 — get_client()
def get_client(service_name='firehose', is_in_aws=True):
if not is_in_aws:
session = boto3.Session(
aws_access_key_id = ACCESS_KEY,
aws_secret_access_key = SECRET_KEY,
region_name = REGION
)
return session.client(service_name)
return boto3.client(service_name, region_name=REGION)
kinesis = get_client('kinesis', False)
- is_in_aws=False — 로컬 PC 실행이므로 ACCESS_KEY / SECRET_KEY 명시적으로 주입
- service_name='kinesis' — 기본값이 firehose지만 Kinesis Data Streams 클라이언트로 오버라이드
3. 가짜 거래 데이터 생성 — gen_stock_data()
def gen_stock_data():
ticker = ['NVDA', 'GOOG', 'AAPL', 'TSLA', 'AMZN', 'MSFT']
return {
"event_time": datetime.now().isoformat(),
"ticker": random.choice(ticker),
"price": round(random.uniform(100, 1000), 2),
"volume": random.randint(1, 100),
"trade_id": random.randint(1000000, 9999999)
}
- event_time — ISO 8601 포맷 문자열. Flink의 TIMESTAMP(3) 타입과 호환되어 이벤트 타임 기반 윈도우 처리에 사용됨
- ticker — 6개 종목 중 랜덤 선택. 이후 PartitionKey로도 활용됨
- price — 100~1000 범위 실수, 소수점 2자리
- volume / trade_id — 각각 거래량과 거래 ID
4. KDS Input으로 전송 루프
try:
while True:
data = gen_stock_data()
kinesis.put_record(
StreamName = "de-ai-04-an2-kds-stock-input",
Data = json.dumps(data),
PartitionKey = data['ticker']
)
print(f'전송:{data}')
time.sleep(1)
except Exception as e:
print('중단', e)
- StreamName — Flink가 구독할 입력 스트림 이름
- Data — Kinesis는 문자열/바이트만 허용하므로 json.dumps()로 직렬화 필수
- PartitionKey = data['ticker'] — 티커 해싱 기반으로 샤드 분산. 같은 종목은 같은 샤드로 향함
- time.sleep(1) — 초당 1건 전송
Part 2. Flink 애플리케이션 구성
1. Flink의 역할
KDS Input → [Flink 연산] → KDS Output
- 실시간 스트림 처리 엔진 — 1초 미만 대기시간으로 데이터를 처리하는 서버리스
- 체급이 높은 편 — Lambda 대비 무거운 연산(윈도우 집계, 조인, 복잡한 SQL) 처리에 적합
- 다국어 지원 — Java / Scala / Python(PyFlink) + SQL 결합 가능
- 목표 — Raw 데이터를 실시간으로 전처리하여 가공된 형태로 S3에 적재
2. Flink 배포 방식 — Streaming Application
로컬 소스코드 (app.py + JAR) → zip으로 빌드 → S3 업로드 → Flink가 참조하여 실행
- Streaming Application 형태 — 외부에서 제작한 코드를 S3에 올려두고 Flink가 참조하는 방식
- Studio Notebook 형태 — 편리하지만 세팅이 많아 추후 사용 예정
- S3 구조 — app/app.zip에 코드와 JAR을 묶어서 업로드
- zip 내부 구조:
app.zip
├── app.py
└── lib/
└── flink-sql-connector-kinesis-1.15.4.jar
3. 런타임 속성 (Runtime Properties)
Flink 콘솔에서 zip 파일 내부의 어떤 파일을 참조할지 지정하는 설정.
항목 값
| Group ID | kinesis.analytics.flink.run.options |
| Key (jarfile) | flink-connector-kinesis-1.15.4.jar |
| Key (python) | app.py |
- Group ID — Flink에 전달할 속성 그룹 식별자
- jarfile — Kinesis connector JAR 경로. SQL에서 'connector' = 'kinesis' 사용 시 필수
- python — 실행할 Python 엔트리포인트 파일
Part 3. app.py — Flink SQL 집계 코드
1. 환경 설정
from pyflink.table import EnvironmentSettings, TableEnvironment
def main():
setting = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = TableEnvironment.create(setting)
- EnvironmentSettings.new_instance() — Flink 실행 환경 설정 빌더 생성
- in_streaming_mode() — 스트리밍 모드 지정. 배치 모드(in_batch_mode) 대비 연속 데이터 처리용
- TableEnvironment.create(setting) — SQL 스타일로 데이터를 다룰 수 있는 테이블 환경 생성
2. 입력 테이블 — stock_input
t_env.execute_sql('''
CREATE TABLE stock_input (
ticker STRING,
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'de-ai-04-an2-kds-stock-input',
'aws.region' = 'ap-northeast-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
)
''')
- 컬럼 선택 — Raw 데이터 5개 필드 중 집계에 필요한 3개(ticker, price, event_time)만 스키마로 구성
- TIMESTAMP(3) — 밀리초까지 정밀도 표현. ISO 8601 문자열이 자동 파싱됨
- WATERMARK — 네트워크 지연 허용 범위 지정. event_time - INTERVAL '5' SECOND는 5초까지 늦게 도착한 이벤트도 유효한 것으로 처리하겠다는 의미
- connector = 'kinesis' — KDS 커넥터 사용. 런타임 속성의 jarfile이 제공됨
- scan.stream.initpos = 'LATEST' — 앱 시작 이후 들어온 최신 데이터부터 읽기. TRIM_HORIZON으로 바꾸면 가장 오래된 데이터부터 읽음
- format = 'json' — Kinesis 레코드를 JSON으로 파싱
3. 출력 테이블 — stock_output
t_env.execute_sql('''
CREATE TABLE stock_output (
ticker STRING,
avg_price DOUBLE,
avg_time TIMESTAMP(3)
) WITH (
'connector' = 'kinesis',
'stream' = 'de-ai-04-an2-kds-stock-output',
'aws.region' = 'ap-northeast-2',
'format' = 'json'
)
''')
- avg_price — 집계된 평균 가격 필드
- avg_time — 윈도우 종료 시각. 10초 윈도우가 닫히는 시점
- stream — 출력용 KDS 스트림. Firehose가 이후 구독할 대상
- WATERMARK 불필요 — 출력 테이블은 sink 역할이므로 이벤트 타임 지연 처리 설정이 필요 없음
4. 집계 쿼리 — INSERT INTO
t_env.execute_sql('''
INSERT INTO stock_output
SELECT
ticker,
AVG(price) AS avg_price,
TUMBLE_END(event_time, INTERVAL '10' SECOND) AS avg_time
FROM stock_input
GROUP BY
TUMBLE(event_time, INTERVAL '10' SECOND),
ticker
''').wait()
- TUMBLE 윈도우 — 고정 크기 비중첩 윈도우. 10초마다 새 윈도우가 열리고 닫힘 (예: 00:00~00:10, 00:10~00:20)
- GROUP BY TUMBLE(...) — 윈도우 테이블 함수 방식. 이 방식이어야 append-only 모드로 동작하여 Kinesis sink로 출력 가능
- GROUP BY ... ticker — 윈도우 × 티커 조합으로 그룹화. 10초 동안 각 티커별로 평균가 1건 생성됨
- TUMBLE_END — 윈도우 종료 시각 반환. SELECT 절에서는 결과 컬럼으로 사용
- .wait() — 쿼리 처리가 완료될 때까지 블로킹. 스트리밍이므로 실제로는 앱이 종료될 때까지 계속 돌아감
집계 결과 예시:
{
"ticker": "GOOG",
"avg_price": 512.47,
"avg_time": "2026-04-16T16:25:10.000"
}
5. TUMBLE vs TUMBLE_END 구분
사용 위치 함수 의미
| GROUP BY | TUMBLE(event_time, INTERVAL '10' SECOND) | 윈도우 자체를 그룹 기준으로 사용 |
| SELECT | TUMBLE_END(event_time, INTERVAL '10' SECOND) | 윈도우 종료 시각을 컬럼값으로 사용 |
- GROUP BY TUMBLE_END을 쓰면 안 되는 이유 — 단순 스칼라 함수로 해석되어 update 모드 집계가 됨. Kinesis sink는 append-only만 지원하므로 TableException: doesn't support consuming update changes 에러 발생
Part 4. Firehose 연결 및 S3 적재
1. Firehose 구성
KDS Output → [Firehose] → S3 (가공된 데이터)
항목 값
| 이름 | de-ai-04-an2-adf-link-stock |
| 소스(Source) | Amazon Kinesis Data Streams |
| 소스 스트림 | de-ai-04-an2-kds-stock-output |
| 대상(Destination) | Amazon S3 |
| 버퍼 조건 | 5MB / 5분 (기본값) |
- Source = KDS Output — Flink가 집계 결과를 쏘는 스트림을 구독
- 최종 S3 적재 데이터 — Raw가 아닌 10초 평균가 집계 결과. 원본 데이터 대비 용량이 크게 줄어듦
- 자동 파티셔닝 — S3 경로에 yyyy/MM/dd/HH 구조로 자동 분할
2. 왜 두 단계 파이프라인인가
Raw 데이터를 바로 S3에 쌓는 방식과 비교:
방식 구조 장점 단점
| 직접 적재 | 프로듀서 → KDS → Firehose → S3 | 단순, 원본 보존 | 저장 용량 큼, 분석 시 재처리 필요 |
| Flink 전처리 | 프로듀서 → KDS → Flink → KDS → Firehose → S3 | 집계된 결과 저장, 쿼리 비용 절감 | 파이프라인 복잡도 증가 |
- 실시간 대시보드 — Flink 처리 결과를 곧바로 시각화 가능
- 저장 비용 최적화 — 초당 1건 Raw 데이터를 10초 1건 집계로 줄이면 저장량 1/10 수준
전체 파이프라인 요약
┌──────────────────────────────┐
│ kds_flink_adf_producer.py │
│ (로컬 프로듀서) │
│ │
│ gen_stock_data() + put_record│
│ 초당 1건 × 6 티커 │
└──────────────┬───────────────┘
↓
┌──────────────────────────────┐
│ KDS Input │
│ de-ai-04-an2-kds-stock-input │
│ (샤드 × N, PartitionKey=티커)│
└──────────────┬───────────────┘
↓
┌──────────────────────────────┐
│ Flink Application │
│ de-ai-04-an2-flink-stock │
│ │
│ stock_input 테이블 │
│ ↓ TUMBLE(10s) × ticker │
│ AVG(price) 집계 │
│ ↓ │
│ stock_output 테이블 │
└──────────────┬───────────────┘
↓
┌──────────────────────────────┐
│ KDS Output │
│ de-ai-04-an2-kds-stock-output│
└──────────────┬───────────────┘
↓
┌──────────────────────────────┐
│ Firehose │
│ de-ai-04-an2-adf-link-stock │
│ (5MB / 5분 버퍼링) │
└──────────────┬───────────────┘
↓
┌──────────────────────────────┐
│ S3 │
│ /yyyy/MM/dd/HH/... │
│ (10초 평균가 집계 데이터) │
└──────────────────────────────┘
계층 리소스 역할 핵심 포인트
| 프로듀서 | kds_flink_adf_producer.py | 가짜 주가 데이터 생성 및 전송 | boto3 + put_record |
| 입력 스트림 | KDS (stock-input) | Raw 데이터 임시 버퍼 | Flink가 LATEST 포지션 구독 |
| 스트림 처리 | Flink (flink-stock) | TUMBLE 10초 ticker별 AVG 집계 | PyFlink + SQL, JAR 포함 zip |
| 출력 스트림 | KDS (stock-output) | 집계 결과 중계 | Firehose가 구독 |
| 적재 소비자 | Firehose (adf-link-stock) | 버퍼링 후 S3 배치 적재 | 5MB / 5분 기본 |
| 저장소 | S3 | 가공된 데이터 영구 저장 | 날짜 기반 자동 파티셔닝 |
프로듀서가 Raw 데이터를 KDS로 쏘고 → Flink가 10초 TUMBLE 윈도우로 종목별 평균가를 집계한 뒤 → 출력 KDS를 거쳐 Firehose가 S3에 배치 적재하는, 실시간 전처리가 포함된 5단계 서버리스 스트리밍 파이프라인 구조
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Athena - 2 (Athena 기반 일일 리포트 생성 DAG) (0) | 2026.04.20 |
|---|---|
| Athena - 1 (Ahtena + Airflow) (0) | 2026.04.17 |
| Kinesis - 1 (KDS+Firehose+S3) (0) | 2026.04.16 |
| Firehose - 2 (생성한 로그 Firehose 전송) (1) | 2026.04.15 |
| Firehose - 1 (LogGenerator로 도메인별 로그 생성) (1) | 2026.04.15 |