SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Medallion Architecture - 2 — Bronze 코드 구현

dev-lee 2026. 4. 21. 16:31

멀티프로세싱으로 점포(store-01 ~ store-N)별 독립 프로세스를 생성하여, 각 점포의 매출 로그를 Kinesis Data Streams로 실시간 전송하는 데이터 생성기


아래 코드를 통해 키네시스로 데이터를 보낸 후 AWS 콘솔에서 Firehose와 연결하여 S3에 적재하는 과정을 거침.

전체 구조

main (부모 프로세스)
  ├─ Process-0: run_producer(store-01) → gen_data → send_to_kinesis → 무한루프
  ├─ Process-1: run_producer(store-02) → gen_data → send_to_kinesis → 무한루프
  ├─ Process-2: run_producer(store-03) → gen_data → send_to_kinesis → 무한루프
  └─ Process-3: run_producer(store-04) → gen_data → send_to_kinesis → 무한루프
                                                          ↓
                                                    Kinesis Data Stream
                                                (de-ai-04-ap2-kdf-medallion-bronze-stream)

NUM_STORES(4개)만큼 자식 프로세스를 생성하여, 각 프로세스가 독립적으로 데이터를 생성하고 Kinesis로 전송하는 구조. Ctrl+C로 전체 종료.


1. 모듈 가져오기

import json
import os
import uuid
import random
import boto3
import time
from datetime import datetime, UTC
from faker import Faker
from botocore.exceptions import ClientError
from dotenv import load_dotenv
import multiprocessing
  • uuid — 이벤트 ID를 중복 없이 생성 (UUID v4 해싱)
  • boto3 — AWS SDK. Kinesis 클라이언트 생성에 사용
  • Faker — 가짜 IP, 브라우저 정보 등 현실적인 더미 데이터 생성
  • multiprocessing — 점포별 독립 프로세스 생성. threading과 달리 GIL 제약 없이 진정한 병렬 실행
  • dotenv — .env 파일에서 AWS 키를 로드하여 코드에 하드코딩 방지

2. 환경 변수 및 AWS 연동

load_dotenv()

AWS_ACCESS_KEY = os.getenv('ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('SECRET_KEY')
AWS_REGION = 'ap-northeast-2'
KINESIS_DATA_STREAM_NAME = 'de-ai-04-ap2-kdf-medallion-bronze-stream'

fake = Faker()

session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)
kinesis_client = session.client('kinesis')
  • boto3.Session — AWS 외부(로컬)에서 실행하므로 ACCESS_KEY/SECRET_KEY를 명시적으로 전달
  • KINESIS_DATA_STREAM_NAME — 전송 대상 KDS 스트림. Medallion Architecture의 Bronze Layer 입구

3. 데이터 생성 — gen_data()

def gen_data(store_id):
    items = [
        {"item_id": "bread-001", "item_name": "우유식빵", "price": 5500},
        {"item_id": "bread-002", "item_name": "천연발효버터치아바타", "price": 4800},
        {"item_id": "coffee-01", "item_name": "아메리카노", "price": 4000},
        {"item_id": "jam-01", "item_name": "수제 딸기잼", "price": 8500}
    ]
    selected_item = random.choice(items)
    qty = random.randint(1, 3)
    current_utc_time = datetime.now(UTC).isoformat().replace("+00:00", "Z")

    raw_log = {
        "event_id"  : str(uuid.uuid4()),
        "event_time": current_utc_time,
        "source_ip" : fake.ipv4(),
        "user_agent": fake.user_agent(),
        "data": json.dumps({
            "user_id" : f"user_{random.randint(100, 999)}",
            "item_id" : selected_item["item_id"],
            "price"   : selected_item["price"],
            "qty"     : qty,
            "store_id": store_id
        }),
        "ingested_at": current_utc_time
    }
    return raw_log
  • items — 4개 제품 마스터 데이터. random.choice로 랜덤 선택
  • uuid.uuid4() — 전역적으로 고유한 이벤트 ID 생성. 중복 불가능
  • fake.ipv4() / fake.user_agent() — Faker로 현실적인 클라이언트 접속 정보 시뮬레이션
  • data 필드 — dict 안에 dict 중첩 구조. json.dumps로 객체 직렬화하여 문자열로 저장. Bronze Layer에서는 이 상태로 보관하고 Silver 단계에서 파싱/정제
  • event_time / ingested_at — UTC 기준 ISO 8601 포맷. 현재는 동일 값이지만, 실무에서는 event_time(이벤트 발생)과 ingested_at(시스템 수집 시각)이 다를 수 있음

생성되는 데이터 예시:

{
    "event_id": "a3f1c2d4-...-7e8f9a0b",
    "event_time": "2026-04-20T06:30:01.123Z",
    "source_ip": "192.168.45.12",
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; ...",
    "data": "{\"user_id\": \"user_342\", \"item_id\": \"coffee-01\", \"price\": 4000, \"qty\": 2, \"store_id\": \"store-01\"}",
    "ingested_at": "2026-04-20T06:30:01.123Z"
}

4. Kinesis 전송 — send_to_kinesis()

def send_to_kinesis(log_entry):
    try:
        kinesis_client.put_record(
            StreamName  = KINESIS_DATA_STREAM_NAME,
            Data        = json.dumps(log_entry),
            PartitionKey= log_entry['event_id']
        )
        return True
    except Exception as e:
        print('aws 전송 에러', e)
        return False
  • put_record — KDS 스트림에 단일 레코드 전송
  • Data — dict를 json.dumps로 직렬화. Kinesis는 문자열/바이트만 허용
  • PartitionKey — event_id를 파티션 키로 사용. Kinesis가 이 값을 해싱 → 수치화 → 구간화 → 해당 구간의 샤드에 배치. event_id는 UUID이므로 해시값이 균등 분포되어 샤드 간 부하가 골고루 분산됨

PartitionKey와 샤드의 관계:

  • 1개의 샤드에 여러 event_id가 배치됨 (분산 구조)
  • 적정 샤드 수를 모르면 온디맨드로 테스트하여 적정 샤드 수를 산출한 뒤 프로비저닝 시 지정

5. 프로듀서 실행 — run_producer()

def run_producer(i, store_id):
    try:
        print(f'프로세스-{i} 가동')
        while True:
            log_entry = gen_data(store_id)
            if send_to_kinesis(log_entry):
                print(f'{log_entry["event_time"]} 전송 성공 {store_id}')
            time.sleep(random.uniform(0.5, 1.5))
    except KeyboardInterrupt:
        print('발생 중단')
    print(f'프로세스-{i} 종료')
  • while True — 무한루프로 지속적인 데이터 생성 및 전송
  • time.sleep(random.uniform(0.5, 1.5)) — 0.5~1.5초 랜덤 간격으로 전송. 일정하지 않은 간격이 현실적인 매출 발생 패턴을 시뮬레이션
  • KeyboardInterrupt — Ctrl+C로 개별 프로세스 정상 종료 처리

6. 메인 실행 — 멀티프로세스 생성

if __name__ == '__main__':
    NUM_STORES = 4
    processes = list()
    print(f'{NUM_STORES}개의 프로세스가 데이터를 발생시켜서 Kinesis에 전송중')

    for i in range(NUM_STORES):
        store_id = f"store-{str(i+1).zfill(2)}"
        p = multiprocessing.Process(target=run_producer, args=(i, store_id))
        processes.append(p)
        p.start()

    try:
        for p in processes:
            p.join()
    except Exception:
        print('종료')
  • NUM_STORES = 4 — 4개 점포 = 4개 독립 프로세스 동시 실행
  • str(i+1).zfill(2) — 점포 ID를 store-01, store-02 형태로 제로패딩
  • multiprocessing.Process — 각 점포마다 독립 프로세스 생성. threading과 달리 Python GIL 제약 없이 CPU 코어를 각각 활용 가능
  • p.start() — 프로세스 시작. run_producer 함수가 자식 프로세스에서 실행됨
  • p.join() — 부모 프로세스가 자식 프로세스 종료를 대기. 모든 자식이 끝나야 메인도 종료
  • if __name__ == '__main__' — 멀티프로세싱에서 필수. 이걸 안 쓰면 자식 프로세스가 생성될 때 모듈이 재임포트되면서 무한 프로세스 생성이 발생할 수 있음

요약

구성 요소  역할  핵심 포인트
gen_data() 점포별 매출 로그 생성 UUID 이벤트 ID + Faker 더미 데이터 + dict 중첩(Bronze용)
send_to_kinesis() KDS로 단일 레코드 전송 PartitionKey=event_id로 샤드 균등 분산
run_producer() 무한루프 데이터 생성+전송 0.5~1.5초 랜덤 간격으로 현실적 패턴 시뮬레이션
main 멀티프로세스 4개 생성 점포별 독립 프로세스, GIL 제약 없는 병렬 실행

멀티프로세싱으로 점포별 독립 프로세스를 생성하여 매출 로그를 Kinesis로 실시간 전송하고, Firehose를 거쳐 S3 Bronze Layer에 원본 데이터를 적재하는 데이터 생성기