주가 로그 파이프라인 구축 — 프로듀서부터 KDS, Firehose, S3 적재까지
boto3 프로듀서가 가짜 주가 로그를 Kinesis Data Streams로 전송하고, Firehose가 해당 스트림을 구독하여 S3에 배치 적재하는 실시간 파이프라인 구조
전체 아키텍처
stock_log_generator.py (로컬)
↓ put_record (1초 간격)
KDS (de-ai-04-an2-kds-stock-input)
↓ 구독 (자동)
Firehose (de-ai-30-an2-adf-stock-analysis)
↓ 버퍼링 후 배치 전송
S3 (de-ai-30-827913617635-ap-northeast-2-an)
로컬 프로듀서 → KDS → Firehose → S3로 이어지는 3단계 스트리밍 파이프라인. KDS는 실시간 데이터의 임시 버퍼 역할이고, Firehose는 이를 구독하여 일정 조건(시간/용량)에 따라 S3에 배치 적재하는 구조.
Part 1. stock_log_generator.py — 가짜 주가 로그 프로듀서
1. 모듈 임포트
import os
import time
import json
import random
from datetime import datetime
import boto3
from dotenv import load_dotenv
- os — 환경변수 접근용. .env에서 로드된 값을 os.getenv()로 조회
- time — sleep()으로 전송 간격 제어 (초당 1건)
- json — dict → JSON 문자열 직렬화. Kinesis는 문자열/바이트 타입만 허용하므로 필수
- random — 티커 랜덤 선택, 가격/거래량 랜덤 생성
- datetime — 이벤트 발생 시각을 ISO 8601 포맷으로 생성
- boto3 — AWS SDK for Python. Kinesis 클라이언트 생성에 사용
- dotenv — .env 파일을 읽어 환경변수로 등록해주는 패키지
2. 환경변수 및 상수 세팅
load_dotenv()
ACCESS_KEY = os.getenv('ACCESS_KEY')
SECRET_KEY = os.getenv('SECRET_KEY')
REGION = 'ap-northeast-2'
STREAM_NAME = 'de-ai-04-an2-kds-stock-input'
TICKERS = ['NVDA', 'GOOG', 'AAPL', 'TSLA', 'AMZN', 'MSFT']
- load_dotenv() — 프로젝트 루트의 .env 파일을 읽어 환경변수로 등록
- ACCESS_KEY / SECRET_KEY — AWS IAM 사용자의 인증 정보. 코드에 하드코딩하지 않고 .env로 분리하여 Git 유출 방지
- REGION — ap-northeast-2는 서울 리전
- STREAM_NAME — 전송 대상 KDS 스트림 이름
- TICKERS — 6개 종목 리스트. 이후 random.choice()와 PartitionKey로 활용됨
3. Kinesis 클라이언트 생성 — get_client()
def get_client(service_name='kinesis', is_in_aws=True):
if is_in_aws:
return boto3.client(service_name, region_name=REGION)
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name=REGION,
)
return session.client(service_name)
- is_in_aws=True — AWS 내부 실행. IAM Role이 자동 연결되어 키 없이 인증 가능
- is_in_aws=False — AWS 외부 실행 (로컬). ACCESS_KEY / SECRET_KEY를 명시적으로 전달
- service_name='kinesis' — KDS 서비스 클라이언트 생성. 다른 서비스(firehose, s3)로 변경 가능
AWS 내부 vs 외부 인증 흐름:
항목 AWS 내부 (CloudShell, EC2 등) AWS 외부 (로컬 PC 등)
| 인증 방식 | IAM Role 자동 | ACCESS_KEY + SECRET_KEY |
| boto3 호출 | boto3.client() | boto3.Session().client() |
| 키 필요 여부 | 불필요 | 필요 |
4. 가짜 거래 데이터 생성 — gen_stock_data()
def gen_stock_data():
return {
'event_time': datetime.now().isoformat(),
'ticker': random.choice(TICKERS),
'price': round(random.uniform(100, 1000), 2),
'volume': random.randint(1, 100),
'trade_id': random.randint(1_000_000, 9_999_999),
}
- event_time — ISO 8601 포맷 문자열 (예: 2026-04-16T16:24:18.922702)
- ticker — 6개 종목 중 랜덤 선택. 이후 PartitionKey로도 사용됨
- price — 100~1000 범위 실수, 소수점 2자리 반올림
- volume — 1~100 범위 정수 거래량
- trade_id — 7자리 정수 거래 ID. 숫자 구분자 _는 가독성용 문법
생성되는 데이터 예시:
{
"event_time": "2026-04-16T16:24:18.922702",
"ticker": "GOOG",
"price": 883.03,
"volume": 37,
"trade_id": 2410348
}
5. KDS 전송 루프 — main()
def main():
kinesis = get_client('kinesis', is_in_aws=False)
print(f'거래 데이터 전송 시작... (stream: {STREAM_NAME})')
try:
while True:
data = gen_stock_data()
kinesis.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(data),
PartitionKey=data['ticker'],
)
print(f'전송: {data}')
time.sleep(1)
except KeyboardInterrupt:
print('\n사용자 중단')
except Exception as e:
print(f'중단: {e}')
- kinesis.put_record() — KDS 스트림에 단일 레코드 전송. 응답으로 ShardId와 SequenceNumber 반환
- StreamName — 전송 대상 KDS 스트림 이름. 사전에 생성되어 있어야 함
- Data — 실제 전송 데이터. 문자열/바이트 타입만 허용되므로 json.dumps()로 직렬화 필수
- PartitionKey — 어떤 샤드로 보낼지 결정하는 키. 티커 해싱 기반으로 샤드 매핑됨
- time.sleep(1) — 1초 간격 전송 제어 (초당 1건)
- KeyboardInterrupt 분리 — Ctrl+C 정상 종료 처리
- Exception — 네트워크/권한/스트림 부재 등 I/O 에러 포착
Part 2. Kinesis Data Streams 구성
1. KDS의 역할
프로듀서 → [KDS] → Firehose → S3
- 실시간 데이터 임시 버퍼 — 초당 수천~수만 건 수준의 실시간 데이터를 받아내는 파이프 역할
- 기본 보존 기간 — 24시간. 그 안에 소비자가 가져가지 않으면 사라짐 (최대 365일까지 연장 가능)
- 샤드 단위 확장 — 샤드당 초당 1,000건, 1MB 쓰기 처리량 제공. 처리량 부족 시 샤드 개수를 늘려 확장
- 소비자 동시 연결 — 하나의 스트림을 여러 소비자(Firehose, Lambda, Flink 등)가 동시에 구독 가능
2. 스트림 생성 (CLI 예시)
aws kinesis create-stream \
--stream-name de-ai-04-an2-kds-stock-input \
--shard-count 6 \
--region ap-northeast-2
- shard-count — 샤드 개수. 티커가 6개이므로 종목별로 독립된 샤드를 갖도록 6으로 설정
- PartitionKey와의 관계 — 프로듀서가 티커를 PartitionKey로 넣으면, Kinesis가 해시하여 6개 샤드 중 하나로 분배
Part 3. Amazon Data Firehose — KDS 구독 및 S3 적재
1. Firehose의 역할
KDS (실시간 스트림) → [Firehose] → S3 (배치 파일)
- 소비자 역할 — KDS에서 데이터를 자동으로 pull하여 소비함
- 버퍼링 후 배치 전송 — 일정 시간/용량에 도달하면 S3에 파일 단위로 적재
- 서버리스 관리형 서비스 — 코드 작성 불필요, 콘솔 설정만으로 동작
- 자동 파티셔닝 — S3 경로에 yyyy/MM/dd/HH 형태로 자동 분할 저장
2. Firehose 스트림 설정
| 항목 | 값 |
| 소스(Source) | Amazon Kinesis Data Streams |
| 소스 스트림 | de-ai-04-an2-kds-stock-input |
| 대상(Destination) | Amazon S3 |
| 대상 버킷 | s3://de-ai-30-827913617635-ap-northeast-2-an |
| 이름 | de-ai-30-an2-adf-stock-analysis |
- Source = KDS — Firehose가 Direct PUT 방식이 아닌 KDS 스트림을 구독하는 방식으로 구성됨
- Destination = S3 — 최종 적재 대상. 버킷 이름에 계정 ID와 리전이 포함된 네이밍 규칙
- 자동 구독 — Firehose가 생성되는 순간부터 KDS 스트림의 데이터를 자동으로 소비하기 시작함
3. 버퍼 동작 원리
Firehose는 KDS에서 읽은 데이터를 메모리에 버퍼링한 뒤, 다음 두 조건 중 먼저 도달하는 조건에 따라 S3에 flush함.
조건 기본값 의미
| Buffer size | 5 MB | 누적 데이터 크기 |
| Buffer interval | 300초 (5분) | 마지막 flush 이후 경과 시간 |
- 짧은 간격 설정 — 실시간성이 중요하면 interval을 60초로 줄임
- 긴 간격 설정 — 비용 최적화가 중요하면 용량 기준으로 더 큰 파일 생성
- S3 PUT 요청 비용 — 너무 짧은 간격은 요청 수 증가로 비용 상승
4. S3 적재 결과
Firehose가 생성하는 S3 객체 경로 구조:
s3://de-ai-30-827913617635-ap-northeast-2-an/
└── 2026/
└── 04/
└── 16/
└── 16/
└── de-ai-30-an2-adf-stock-analysis-1-2026-04-16-16-30-12-xxxxx
- 날짜 기반 프리픽스 — yyyy/MM/dd/HH 구조로 자동 분할됨
- 파일 내용 — 버퍼링된 기간 동안 KDS로 들어온 모든 레코드가 줄바꿈 없이 연속 JSON으로 저장됨
- 파일명 — Firehose 스트림 이름 + 타임스탬프 + 랜덤 UUID 조합
전체 파이프라인 요약
┌─────────────────────────┐
│ stock_log_generator.py │
│ (로컬 프로듀서) │
│ │
│ gen_stock_data() │
│ ↓ │
│ json.dumps() │
│ ↓ │
│ kinesis.put_record() │
└──────────┬──────────────┘
│ 초당 1건
↓
┌─────────────────────────┐
│ KDS │
│ de-ai-04-an2-kds- │
│ stock-input │
│ (샤드 6개, 티커별 분산) │
└──────────┬──────────────┘
│ Firehose가 자동 구독
↓
┌─────────────────────────┐
│ Firehose │
│ de-ai-30-an2-adf- │
│ stock-analysis │
│ (5MB 또는 5분 기준 버퍼) │
└──────────┬──────────────┘
│ 배치 flush
↓
┌─────────────────────────┐
│ S3 │
│ de-ai-30-827913617635- │
│ ap-northeast-2-an │
│ /yyyy/MM/dd/HH/... │
└─────────────────────────┘
계층 리소스 역할 핵심 포인트
| 프로듀서 | stock_log_generator.py | 가짜 주가 데이터 생성 및 전송 | boto3 + put_record |
| 스트림 | KDS (stock-input) | 실시간 데이터 임시 버퍼 | 샤드 6개, PartitionKey=ticker |
| 소비자 | Firehose (adf-stock-analysis) | KDS 구독 + S3 배치 적재 | 5MB / 5분 버퍼링 |
| 저장소 | S3 (an 버킷) | 최종 영구 저장소 | 날짜 기반 자동 파티셔닝 |
로컬 프로듀서가 KDS로 실시간 전송하고 → Firehose가 이를 구독하여 버퍼링 후 → S3에 날짜별로 배치 적재하는 서버리스 스트리밍 파이프라인 구조
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Athena - 1 (Ahtena + Airflow) (0) | 2026.04.17 |
|---|---|
| Kinesis - 2(KDS+Flink+Firehose+S3) (0) | 2026.04.16 |
| Firehose - 2 (생성한 로그 Firehose 전송) (1) | 2026.04.15 |
| Firehose - 1 (LogGenerator로 도메인별 로그 생성) (1) | 2026.04.15 |
| Airflow - 9 (이벤트 기반 파이프라인) (0) | 2026.04.15 |