멀티프로세싱으로 점포(store-01 ~ store-N)별 독립 프로세스를 생성하여, 각 점포의 매출 로그를 Kinesis Data Streams로 실시간 전송하는 데이터 생성기
아래 코드를 통해 키네시스로 데이터를 보낸 후 AWS 콘솔에서 Firehose와 연결하여 S3에 적재하는 과정을 거침.
전체 구조
main (부모 프로세스)
├─ Process-0: run_producer(store-01) → gen_data → send_to_kinesis → 무한루프
├─ Process-1: run_producer(store-02) → gen_data → send_to_kinesis → 무한루프
├─ Process-2: run_producer(store-03) → gen_data → send_to_kinesis → 무한루프
└─ Process-3: run_producer(store-04) → gen_data → send_to_kinesis → 무한루프
↓
Kinesis Data Stream
(de-ai-04-ap2-kdf-medallion-bronze-stream)
NUM_STORES(4개)만큼 자식 프로세스를 생성하여, 각 프로세스가 독립적으로 데이터를 생성하고 Kinesis로 전송하는 구조. Ctrl+C로 전체 종료.
1. 모듈 가져오기
import json
import os
import uuid
import random
import boto3
import time
from datetime import datetime, UTC
from faker import Faker
from botocore.exceptions import ClientError
from dotenv import load_dotenv
import multiprocessing
- uuid — 이벤트 ID를 중복 없이 생성 (UUID v4 해싱)
- boto3 — AWS SDK. Kinesis 클라이언트 생성에 사용
- Faker — 가짜 IP, 브라우저 정보 등 현실적인 더미 데이터 생성
- multiprocessing — 점포별 독립 프로세스 생성. threading과 달리 GIL 제약 없이 진정한 병렬 실행
- dotenv — .env 파일에서 AWS 키를 로드하여 코드에 하드코딩 방지
2. 환경 변수 및 AWS 연동
load_dotenv()
AWS_ACCESS_KEY = os.getenv('ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('SECRET_KEY')
AWS_REGION = 'ap-northeast-2'
KINESIS_DATA_STREAM_NAME = 'de-ai-04-ap2-kdf-medallion-bronze-stream'
fake = Faker()
session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION
)
kinesis_client = session.client('kinesis')
- boto3.Session — AWS 외부(로컬)에서 실행하므로 ACCESS_KEY/SECRET_KEY를 명시적으로 전달
- KINESIS_DATA_STREAM_NAME — 전송 대상 KDS 스트림. Medallion Architecture의 Bronze Layer 입구
3. 데이터 생성 — gen_data()
def gen_data(store_id):
items = [
{"item_id": "bread-001", "item_name": "우유식빵", "price": 5500},
{"item_id": "bread-002", "item_name": "천연발효버터치아바타", "price": 4800},
{"item_id": "coffee-01", "item_name": "아메리카노", "price": 4000},
{"item_id": "jam-01", "item_name": "수제 딸기잼", "price": 8500}
]
selected_item = random.choice(items)
qty = random.randint(1, 3)
current_utc_time = datetime.now(UTC).isoformat().replace("+00:00", "Z")
raw_log = {
"event_id" : str(uuid.uuid4()),
"event_time": current_utc_time,
"source_ip" : fake.ipv4(),
"user_agent": fake.user_agent(),
"data": json.dumps({
"user_id" : f"user_{random.randint(100, 999)}",
"item_id" : selected_item["item_id"],
"price" : selected_item["price"],
"qty" : qty,
"store_id": store_id
}),
"ingested_at": current_utc_time
}
return raw_log
- items — 4개 제품 마스터 데이터. random.choice로 랜덤 선택
- uuid.uuid4() — 전역적으로 고유한 이벤트 ID 생성. 중복 불가능
- fake.ipv4() / fake.user_agent() — Faker로 현실적인 클라이언트 접속 정보 시뮬레이션
- data 필드 — dict 안에 dict 중첩 구조. json.dumps로 객체 직렬화하여 문자열로 저장. Bronze Layer에서는 이 상태로 보관하고 Silver 단계에서 파싱/정제
- event_time / ingested_at — UTC 기준 ISO 8601 포맷. 현재는 동일 값이지만, 실무에서는 event_time(이벤트 발생)과 ingested_at(시스템 수집 시각)이 다를 수 있음
생성되는 데이터 예시:
{
"event_id": "a3f1c2d4-...-7e8f9a0b",
"event_time": "2026-04-20T06:30:01.123Z",
"source_ip": "192.168.45.12",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; ...",
"data": "{\"user_id\": \"user_342\", \"item_id\": \"coffee-01\", \"price\": 4000, \"qty\": 2, \"store_id\": \"store-01\"}",
"ingested_at": "2026-04-20T06:30:01.123Z"
}
4. Kinesis 전송 — send_to_kinesis()
def send_to_kinesis(log_entry):
try:
kinesis_client.put_record(
StreamName = KINESIS_DATA_STREAM_NAME,
Data = json.dumps(log_entry),
PartitionKey= log_entry['event_id']
)
return True
except Exception as e:
print('aws 전송 에러', e)
return False
- put_record — KDS 스트림에 단일 레코드 전송
- Data — dict를 json.dumps로 직렬화. Kinesis는 문자열/바이트만 허용
- PartitionKey — event_id를 파티션 키로 사용. Kinesis가 이 값을 해싱 → 수치화 → 구간화 → 해당 구간의 샤드에 배치. event_id는 UUID이므로 해시값이 균등 분포되어 샤드 간 부하가 골고루 분산됨
PartitionKey와 샤드의 관계:
- 1개의 샤드에 여러 event_id가 배치됨 (분산 구조)
- 적정 샤드 수를 모르면 온디맨드로 테스트하여 적정 샤드 수를 산출한 뒤 프로비저닝 시 지정
5. 프로듀서 실행 — run_producer()
def run_producer(i, store_id):
try:
print(f'프로세스-{i} 가동')
while True:
log_entry = gen_data(store_id)
if send_to_kinesis(log_entry):
print(f'{log_entry["event_time"]} 전송 성공 {store_id}')
time.sleep(random.uniform(0.5, 1.5))
except KeyboardInterrupt:
print('발생 중단')
print(f'프로세스-{i} 종료')
- while True — 무한루프로 지속적인 데이터 생성 및 전송
- time.sleep(random.uniform(0.5, 1.5)) — 0.5~1.5초 랜덤 간격으로 전송. 일정하지 않은 간격이 현실적인 매출 발생 패턴을 시뮬레이션
- KeyboardInterrupt — Ctrl+C로 개별 프로세스 정상 종료 처리
6. 메인 실행 — 멀티프로세스 생성
if __name__ == '__main__':
NUM_STORES = 4
processes = list()
print(f'{NUM_STORES}개의 프로세스가 데이터를 발생시켜서 Kinesis에 전송중')
for i in range(NUM_STORES):
store_id = f"store-{str(i+1).zfill(2)}"
p = multiprocessing.Process(target=run_producer, args=(i, store_id))
processes.append(p)
p.start()
try:
for p in processes:
p.join()
except Exception:
print('종료')
- NUM_STORES = 4 — 4개 점포 = 4개 독립 프로세스 동시 실행
- str(i+1).zfill(2) — 점포 ID를 store-01, store-02 형태로 제로패딩
- multiprocessing.Process — 각 점포마다 독립 프로세스 생성. threading과 달리 Python GIL 제약 없이 CPU 코어를 각각 활용 가능
- p.start() — 프로세스 시작. run_producer 함수가 자식 프로세스에서 실행됨
- p.join() — 부모 프로세스가 자식 프로세스 종료를 대기. 모든 자식이 끝나야 메인도 종료
- if __name__ == '__main__' — 멀티프로세싱에서 필수. 이걸 안 쓰면 자식 프로세스가 생성될 때 모듈이 재임포트되면서 무한 프로세스 생성이 발생할 수 있음
요약
| 구성 요소 | 역할 | 핵심 포인트 |
| gen_data() | 점포별 매출 로그 생성 | UUID 이벤트 ID + Faker 더미 데이터 + dict 중첩(Bronze용) |
| send_to_kinesis() | KDS로 단일 레코드 전송 | PartitionKey=event_id로 샤드 균등 분산 |
| run_producer() | 무한루프 데이터 생성+전송 | 0.5~1.5초 랜덤 간격으로 현실적 패턴 시뮬레이션 |
| main | 멀티프로세스 4개 생성 | 점포별 독립 프로세스, GIL 제약 없는 병렬 실행 |
멀티프로세싱으로 점포별 독립 프로세스를 생성하여 매출 로그를 Kinesis로 실시간 전송하고, Firehose를 거쳐 S3 Bronze Layer에 원본 데이터를 적재하는 데이터 생성기
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Medallion Architecture - 4 — Silver DAG 코드 구현 (0) | 2026.04.22 |
|---|---|
| Medallion Architecture - 3 — Silver Layer (0) | 2026.04.22 |
| Medallion Architecture - 1 — Bronze (0) | 2026.04.21 |
| Athena 정리 — 개념, SQL 실습, Airflow 연동 (0) | 2026.04.20 |
| Athena - 2 (Athena 기반 일일 리포트 생성 DAG) (0) | 2026.04.20 |