SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Firehose - 1 (LogGenerator로 도메인별 로그 생성)

dev-lee 2026. 4. 15. 17:14

LogGenerator 클래스 코드 분석 — 도메인별 로그 생성기

Faker와 random을 조합하여 6개 도메인의 가상 로그를 생성하는 클래스. random.choices의 가중치 확률로 현실적인 이벤트 분포를 시뮬레이션하는 구조


전체 구조

LogGenerator
├── get_interval_time()  — 로그 발생 간격 계산 (fixed / random)
├── finance()            — 금융 거래 로그
├── ecommerce()          — 이커머스 행동 로그
├── iot()                — IoT 센서 로그
├── ott()                — OTT 스트리밍 로그
├── factory()            — 스마트팩토리 생산 로그
└── lol_game()           — LOL 게임 이벤트 로그

각 메서드가 하나의 로그 레코드를 dict로 반환함. run.py에서 log_gen_map으로 메서드를 매핑하여 config 기반으로 호출하는 구조.


1. 모듈 임포트 및 Faker 설정

import time
import random
import json
from datetime import datetime
from faker import Faker

fake = Faker('ko_KR')
  • random — 확률 기반 값 생성. random.choice(균등 확률), random.choices(가중치 확률), random.uniform(실수 범위), random.randint(정수 범위)
  • Faker('ko_KR') — 한국어 로케일 설정. 한국식 이름, 주소, 이메일 등의 가상 데이터를 생성함. 모듈 레벨에서 생성되어 클래스 전체에서 공유됨
  • time, json — 클래스 내에서는 미사용. run.py에서 time.sleep(), json.dumps() 용도로 활용됨

2. 로그 발생 간격 계산 — get_interval_time

def get_interval_time(self, mode, interval):
    if mode == 'fixed':
        return interval
    return random.uniform(0.1, interval * 2)
  • mode='fixed' — 고정 주기 반환. 예: interval=1이면 매 1초마다 로그 발생
  • mode='random' — 최소 0.1초 ~ 최대 interval*2초 사이의 무작위 간격 반환
  • run.py에서 time.sleep()의 인자로 사용됨. 실제 로그 시스템의 불규칙한 발생 패턴을 시뮬레이션하는 용도

3. 도메인별 로그 생성 메서드

3-1. 금융 로그 — finance()

def finance(self):
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "FINANCE",
        "user_id": fake.uuid4()[:12],
        "transaction_type": random.choice(["이체", "출금", "입금", "결제"]),
        "amount": round(random.uniform(1000, 1000000), -2),
        "account_from": fake.iban(),
        "status": random.choices(["SUCCESS", "FAIL"], weights=[0.95, 0.05])[0]
    }
  • timestamp — datetime.now().isoformat()으로 ISO 8601 형식 기록. 예: 2026-04-15T10:30:00.123456
  • user_id — fake.uuid4()[:12]로 UUID 앞 12자리만 사용. 전체 UUID는 36자리(daef8c88-0544-4090-886c-4ae8532b34e1)이나 편의상 축약
  • transaction_type — random.choice()로 4개 중 균등 확률(각 25%)로 선택
  • amount — round(..., -2)로 100원 단위 반올림. 범위: 1,000원 ~ 1,000,000원
  • account_from — fake.iban()으로 가상 IBAN 생성. 구조: 국가코드 2자리 + 체크섬 2자리 + 은행식별자/계좌번호 최대 30자리
  • status — random.choices()로 가중치 확률 적용. SUCCESS 95%, FAIL 5%. [0]은 리스트에서 값을 추출하는 것

random.choice vs random.choices 차이:

항목  random.choice random.choices
반환 단일 값 리스트
확률 균등 가중치 지정 가능 (weights)
후처리 불필요 [0]으로 값 추출 필요

 


3-2. 이커머스 로그 — ecommerce()

def ecommerce(self):
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "ECOMMERCE",
        "session_id": fake.ascii_free_email(),
        "action": random.choice(["VIEW", "CLICK", "ADD_TO_CART", "PURCHASE"]),
        "item_id": f"ITEM-{fake.random_int(100, 999)}",
        "price": random.randint(10, 500) * 1000,
        "device": random.choice(["iOS", "Android", "Web"])
    }
  • session_id — fake.ascii_free_email()로 ASCII 문자만 사용하는 무료 이메일 주소를 생성. 세션 식별 용도
  • action — 사용자 행동 퍼널: 조회(VIEW) → 클릭(CLICK) → 장바구니(ADD_TO_CART) → 구매(PURCHASE)
  • item_id — ITEM- 접두사 + 3자리 랜덤 정수. 상품 식별자
  • price — random.randint(10, 500) * 1000으로 10,000원 ~ 500,000원 범위. 1,000원 단위로 깔끔하게 생성됨
  • device — iOS, Android, Web 중 택 1. 사용자 접속 환경 분석에 활용

3-3. IoT 센서 로그 — iot()

def iot(self):
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "IOT",
        "device_id": f"SMART-H-{fake.random_int(1000, 9999)}",
        "sensor": random.choice(["TEMP", "HUMID", "CO2", "MOTION"]),
        "value": round(random.uniform(15.0, 45.0), 2),
        "unit": "standard_unit",
        "battery": f"{random.randint(5, 100)}%"
    }
  • device_id — SMART-H- 접두사 + 4자리 랜덤 정수. 스마트홈 디바이스 식별자
  • sensor — 온도(TEMP), 습도(HUMID), 이산화탄소(CO2), 동작감지(MOTION) 중 택 1
  • value — 15.0~45.0 범위의 실수값. 소수점 2자리
  • unit — "standard_unit"으로 고정. 센서 타입별 단위 매핑은 확장 시 구현 가능
  • battery — 5~100% 범위. 10% 이하 시 이상탐지/유지보수 트리거로 활용 가능

3-4. OTT 스트리밍 로그 — ott()

def ott(self):
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "OTT",
        "user_name": fake.user_name(),
        "content_title": fake.catch_phrase(),
        "event": random.choice(["START", "PAUSE", "RESUME", "END"]),
        "bitrate_kbps": random.randint(2000, 8000),
        "buffering_count": random.randint(0, 3)
    }
  • content_title — fake.catch_phrase()로 가상 콘텐츠 제목 생성. 한국어 로케일이므로 한국어 문구가 생성됨
  • event — 재생(START), 일시정지(PAUSE), 다시재생(RESUME), 종료(END)의 스트리밍 이벤트
  • bitrate_kbps — 2,000~8,000 kbps 범위. 비트레이트는 1초당 처리되는 데이터의 양으로, 영상 품질과 용량을 결정하는 핵심 지표
  • buffering_count — 0~3회. 버퍼링이 많으면 네트워크 문제나 서버 부하를 의미

3-5. 스마트팩토리 로그 — factory()

def factory(self):
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "FACTORY",
        "line_no": f"LINE-{random.randint(1, 3)}",
        "machine_status": random.choices(
            ["OPERATING", "IDLE", "ERROR"], weights=[0.8, 0.15, 0.05]
        )[0],
        "temp_celsius": round(random.uniform(50.0, 85.0), 1),
        "vibration": round(random.uniform(0.1, 5.0), 3),
        "production_count": random.randint(100, 500)
    }
  • line_no — LINE-1 ~ LINE-3까지 3개 생산 라인
  • machine_status — 가중치 확률 적용. 가동(OPERATING) 80%, 대기(IDLE) 15%, 에러(ERROR) 5%
  • temp_celsius — 섭씨 50.0~85.0°C 범위. 기계 작동 온도 시뮬레이션
  • vibration — 0.1~5.0 범위의 진동값. 소수점 3자리. 이상 진동 탐지에 활용 가능
  • production_count — 100~500 범위의 양품 생산 수량. 시간당 생산량(UPH) 모니터링에 활용됨

3-6. LOL 게임 로그 — lol_game()

def lol_game(self):
    events = ["KILL", "DEATH", "ASSIST", "WARD_PLACED", "MINION_KILLED", "OBJECTIVE_TAKEN"]
    champions = ["Garen", "Lux", "Lee Sin", "Yasuo", "Kai'Sa", "Thresh", "Ahri", "Zed"]
    return {
        "timestamp": datetime.now().isoformat(),
        "industry": "GAMING_LOL",
        "match_id": f"KR-{fake.random_number(digits=10)}",
        "player_tag": f"{fake.user_name()}#{fake.random_int(1000, 9999)}",
        "champion": random.choice(champions),
        "event": random.choices(
            events, weights=[0.1, 0.05, 0.15, 0.2, 0.4, 0.1]
        )[0],
        "current_gold": random.randint(0, 15000),
        "kda": f"{random.randint(0, 10)}/{random.randint(0, 10)}/{random.randint(0, 20)}",
        "ping_ms": random.randint(8, 45),
        "is_in_combat": random.choice([True, False])
    }
  • match_id — KR- 접두사 + 10자리 랜덤 숫자. 한국 서버 매치 ID 형식
  • player_tag — 사용자명#1234 형식. 라이엇 태그라인 구조를 시뮬레이션
  • champion — 8개 챔피언 중 균등 확률로 선택
  • event — 가중치 확률 적용. 실제 게임에서 미니언 처치가 킬보다 훨씬 자주 발생하는 것을 반영
  • kda — 킬/데스/어시스트 문자열 형식. 각각 독립적으로 랜덤 생성
  • ping_ms — 8~45ms 범위. 한국 서버 기준의 현실적인 핑 범위
  • is_in_combat — 전투 중 여부. boolean 값

이벤트 확률 분포:

 

이벤트  확률  설명
MINION_KILLED 40% 미니언 처치 (가장 빈번)
WARD_PLACED 20% 와드 설치
ASSIST 15% 어시스트
KILL 10%
OBJECTIVE_TAKEN 10% 오브젝트 획득
DEATH 5% 데스 (가장 드묾)

4. 테스트 코드

if __name__ == '__main__':
    obj = LogGenerator()
    obj.get_interval_time('random', 2)
    obj.finance()
    obj.ecommerce()
    obj.iot()
    obj.ott()
  • 직접 실행 시 각 메서드를 호출하여 정상 동작 여부를 확인하는 용도
  • 반환값을 출력하지 않으므로 테스트 시 print(obj.finance()) 등으로 확인 필요

요약 — 도메인별 로그 필드 비교

도메인  식별자  핵심 필드 확률 모델
Ecommerce session_id (이메일) action, item_id, price, device 균등 확률
IoT device_id (SMART-H-XXXX) sensor, value, battery 균등 확률
OTT user_name event, bitrate_kbps, buffering_count 균등 확률
Factory line_no (LINE-1~3) machine_status, temp_celsius, vibration status: 80/15/5
LOL Game player_tag (이름#태그) champion, event, kda, ping_ms event: 40/20/15/10/10/5
Finance user_id (UUID 12자리) transaction_type, amount, account_from status: 95/5

Faker와 random을 조합하여 6개 도메인의 가상 로그를 생성하는 클래스로, random.choices의 가중치 확률을 통해 현실적인 이벤트 분포를 시뮬레이션하는 구조