SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Kinesis - 1 (KDS+Firehose+S3)

dev-lee 2026. 4. 16. 17:05

주가 로그 파이프라인 구축 — 프로듀서부터 KDS, Firehose, S3 적재까지

boto3 프로듀서가 가짜 주가 로그를 Kinesis Data Streams로 전송하고, Firehose가 해당 스트림을 구독하여 S3에 배치 적재하는 실시간 파이프라인 구조


전체 아키텍처

stock_log_generator.py (로컬)
        ↓ put_record (1초 간격)
KDS (de-ai-04-an2-kds-stock-input)
        ↓ 구독 (자동)
Firehose (de-ai-30-an2-adf-stock-analysis)
        ↓ 버퍼링 후 배치 전송
S3 (de-ai-30-827913617635-ap-northeast-2-an)

로컬 프로듀서 → KDS → Firehose → S3로 이어지는 3단계 스트리밍 파이프라인. KDS는 실시간 데이터의 임시 버퍼 역할이고, Firehose는 이를 구독하여 일정 조건(시간/용량)에 따라 S3에 배치 적재하는 구조.


Part 1. stock_log_generator.py — 가짜 주가 로그 프로듀서


1. 모듈 임포트

import os
import time
import json
import random
from datetime import datetime

import boto3
from dotenv import load_dotenv
  • os — 환경변수 접근용. .env에서 로드된 값을 os.getenv()로 조회
  • time — sleep()으로 전송 간격 제어 (초당 1건)
  • json — dict → JSON 문자열 직렬화. Kinesis는 문자열/바이트 타입만 허용하므로 필수
  • random — 티커 랜덤 선택, 가격/거래량 랜덤 생성
  • datetime — 이벤트 발생 시각을 ISO 8601 포맷으로 생성
  • boto3 — AWS SDK for Python. Kinesis 클라이언트 생성에 사용
  • dotenv — .env 파일을 읽어 환경변수로 등록해주는 패키지

2. 환경변수 및 상수 세팅

load_dotenv()
ACCESS_KEY = os.getenv('ACCESS_KEY')
SECRET_KEY = os.getenv('SECRET_KEY')
REGION = 'ap-northeast-2'
STREAM_NAME = 'de-ai-04-an2-kds-stock-input'
TICKERS = ['NVDA', 'GOOG', 'AAPL', 'TSLA', 'AMZN', 'MSFT']
  • load_dotenv() — 프로젝트 루트의 .env 파일을 읽어 환경변수로 등록
  • ACCESS_KEY / SECRET_KEY — AWS IAM 사용자의 인증 정보. 코드에 하드코딩하지 않고 .env로 분리하여 Git 유출 방지
  • REGION — ap-northeast-2는 서울 리전
  • STREAM_NAME — 전송 대상 KDS 스트림 이름
  • TICKERS — 6개 종목 리스트. 이후 random.choice()와 PartitionKey로 활용됨

3. Kinesis 클라이언트 생성 — get_client()

def get_client(service_name='kinesis', is_in_aws=True):
    if is_in_aws:
        return boto3.client(service_name, region_name=REGION)

    session = boto3.Session(
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        region_name=REGION,
    )
    return session.client(service_name)
  • is_in_aws=True — AWS 내부 실행. IAM Role이 자동 연결되어 키 없이 인증 가능
  • is_in_aws=False — AWS 외부 실행 (로컬). ACCESS_KEY / SECRET_KEY를 명시적으로 전달
  • service_name='kinesis' — KDS 서비스 클라이언트 생성. 다른 서비스(firehose, s3)로 변경 가능

AWS 내부 vs 외부 인증 흐름:

항목 AWS 내부 (CloudShell, EC2 등) AWS 외부 (로컬 PC 등)

인증 방식 IAM Role 자동 ACCESS_KEY + SECRET_KEY
boto3 호출 boto3.client() boto3.Session().client()
키 필요 여부 불필요 필요

4. 가짜 거래 데이터 생성 — gen_stock_data()

def gen_stock_data():
    return {
        'event_time': datetime.now().isoformat(),
        'ticker': random.choice(TICKERS),
        'price': round(random.uniform(100, 1000), 2),
        'volume': random.randint(1, 100),
        'trade_id': random.randint(1_000_000, 9_999_999),
    }
  • event_time — ISO 8601 포맷 문자열 (예: 2026-04-16T16:24:18.922702)
  • ticker — 6개 종목 중 랜덤 선택. 이후 PartitionKey로도 사용됨
  • price — 100~1000 범위 실수, 소수점 2자리 반올림
  • volume — 1~100 범위 정수 거래량
  • trade_id — 7자리 정수 거래 ID. 숫자 구분자 _는 가독성용 문법

생성되는 데이터 예시:

{
    "event_time": "2026-04-16T16:24:18.922702",
    "ticker": "GOOG",
    "price": 883.03,
    "volume": 37,
    "trade_id": 2410348
}

5. KDS 전송 루프 — main()

def main():
    kinesis = get_client('kinesis', is_in_aws=False)
    print(f'거래 데이터 전송 시작... (stream: {STREAM_NAME})')

    try:
        while True:
            data = gen_stock_data()
            kinesis.put_record(
                StreamName=STREAM_NAME,
                Data=json.dumps(data),
                PartitionKey=data['ticker'],
            )
            print(f'전송: {data}')
            time.sleep(1)
    except KeyboardInterrupt:
        print('\n사용자 중단')
    except Exception as e:
        print(f'중단: {e}')
  • kinesis.put_record() — KDS 스트림에 단일 레코드 전송. 응답으로 ShardId와 SequenceNumber 반환
  • StreamName — 전송 대상 KDS 스트림 이름. 사전에 생성되어 있어야 함
  • Data — 실제 전송 데이터. 문자열/바이트 타입만 허용되므로 json.dumps()로 직렬화 필수
  • PartitionKey — 어떤 샤드로 보낼지 결정하는 키. 티커 해싱 기반으로 샤드 매핑됨
  • time.sleep(1) — 1초 간격 전송 제어 (초당 1건)
  • KeyboardInterrupt 분리 — Ctrl+C 정상 종료 처리
  • Exception — 네트워크/권한/스트림 부재 등 I/O 에러 포착

Part 2. Kinesis Data Streams 구성


1. KDS의 역할

프로듀서 → [KDS] → Firehose → S3
  • 실시간 데이터 임시 버퍼 — 초당 수천~수만 건 수준의 실시간 데이터를 받아내는 파이프 역할
  • 기본 보존 기간 — 24시간. 그 안에 소비자가 가져가지 않으면 사라짐 (최대 365일까지 연장 가능)
  • 샤드 단위 확장 — 샤드당 초당 1,000건, 1MB 쓰기 처리량 제공. 처리량 부족 시 샤드 개수를 늘려 확장
  • 소비자 동시 연결 — 하나의 스트림을 여러 소비자(Firehose, Lambda, Flink 등)가 동시에 구독 가능

2. 스트림 생성 (CLI 예시)

aws kinesis create-stream \
  --stream-name de-ai-04-an2-kds-stock-input \
  --shard-count 6 \
  --region ap-northeast-2
  • shard-count — 샤드 개수. 티커가 6개이므로 종목별로 독립된 샤드를 갖도록 6으로 설정
  • PartitionKey와의 관계 — 프로듀서가 티커를 PartitionKey로 넣으면, Kinesis가 해시하여 6개 샤드 중 하나로 분배

Part 3. Amazon Data Firehose — KDS 구독 및 S3 적재


1. Firehose의 역할

KDS (실시간 스트림) → [Firehose] → S3 (배치 파일)
  • 소비자 역할 — KDS에서 데이터를 자동으로 pull하여 소비함
  • 버퍼링 후 배치 전송 — 일정 시간/용량에 도달하면 S3에 파일 단위로 적재
  • 서버리스 관리형 서비스 — 코드 작성 불필요, 콘솔 설정만으로 동작
  • 자동 파티셔닝 — S3 경로에 yyyy/MM/dd/HH 형태로 자동 분할 저장

2. Firehose 스트림 설정

항목 
소스(Source) Amazon Kinesis Data Streams
소스 스트림 de-ai-04-an2-kds-stock-input
대상(Destination) Amazon S3
대상 버킷 s3://de-ai-30-827913617635-ap-northeast-2-an
이름 de-ai-30-an2-adf-stock-analysis
  • Source = KDS — Firehose가 Direct PUT 방식이 아닌 KDS 스트림을 구독하는 방식으로 구성됨
  • Destination = S3 — 최종 적재 대상. 버킷 이름에 계정 ID와 리전이 포함된 네이밍 규칙
  • 자동 구독 — Firehose가 생성되는 순간부터 KDS 스트림의 데이터를 자동으로 소비하기 시작함

3. 버퍼 동작 원리

Firehose는 KDS에서 읽은 데이터를 메모리에 버퍼링한 뒤, 다음 두 조건 중 먼저 도달하는 조건에 따라 S3에 flush함.

조건 기본값 의미

Buffer size 5 MB 누적 데이터 크기
Buffer interval 300초 (5분) 마지막 flush 이후 경과 시간
  • 짧은 간격 설정 — 실시간성이 중요하면 interval을 60초로 줄임
  • 긴 간격 설정 — 비용 최적화가 중요하면 용량 기준으로 더 큰 파일 생성
  • S3 PUT 요청 비용 — 너무 짧은 간격은 요청 수 증가로 비용 상승

4. S3 적재 결과

Firehose가 생성하는 S3 객체 경로 구조:

s3://de-ai-30-827913617635-ap-northeast-2-an/
    └── 2026/
        └── 04/
            └── 16/
                └── 16/
                    └── de-ai-30-an2-adf-stock-analysis-1-2026-04-16-16-30-12-xxxxx
  • 날짜 기반 프리픽스 — yyyy/MM/dd/HH 구조로 자동 분할됨
  • 파일 내용 — 버퍼링된 기간 동안 KDS로 들어온 모든 레코드가 줄바꿈 없이 연속 JSON으로 저장됨
  • 파일명 — Firehose 스트림 이름 + 타임스탬프 + 랜덤 UUID 조합

전체 파이프라인 요약

┌─────────────────────────┐
│ stock_log_generator.py  │
│  (로컬 프로듀서)          │
│                         │
│  gen_stock_data()       │
│       ↓                 │
│  json.dumps()           │
│       ↓                 │
│  kinesis.put_record()   │
└──────────┬──────────────┘
           │ 초당 1건
           ↓
┌─────────────────────────┐
│ KDS                     │
│ de-ai-04-an2-kds-       │
│  stock-input            │
│  (샤드 6개, 티커별 분산)   │
└──────────┬──────────────┘
           │ Firehose가 자동 구독
           ↓
┌─────────────────────────┐
│ Firehose                │
│ de-ai-30-an2-adf-       │
│  stock-analysis         │
│  (5MB 또는 5분 기준 버퍼) │
└──────────┬──────────────┘
           │ 배치 flush
           ↓
┌─────────────────────────┐
│ S3                      │
│ de-ai-30-827913617635-  │
│  ap-northeast-2-an      │
│  /yyyy/MM/dd/HH/...     │
└─────────────────────────┘

계층 리소스 역할 핵심 포인트

프로듀서 stock_log_generator.py 가짜 주가 데이터 생성 및 전송 boto3 + put_record
스트림 KDS (stock-input) 실시간 데이터 임시 버퍼 샤드 6개, PartitionKey=ticker
소비자 Firehose (adf-stock-analysis) KDS 구독 + S3 배치 적재 5MB / 5분 버퍼링
저장소 S3 (an 버킷) 최종 영구 저장소 날짜 기반 자동 파티셔닝

로컬 프로듀서가 KDS로 실시간 전송하고 → Firehose가 이를 구독하여 버퍼링 후 → S3에 날짜별로 배치 적재하는 서버리스 스트리밍 파이프라인 구조