SK플래닛 ai활용 데이터엔지니어 과정 2기/Airflow

Firehose - 2 (생성한 로그 Firehose 전송)

dev-lee 2026. 4. 15. 17:31

run.py + adf_direct_data_put.py 코드 분석 — 로그 생성부터 Firehose 전송까지

LogGenerator로 로그를 생성하고, Amazon Data Firehose를 통해 S3로 스트리밍하는 파이프라인 구조


전체 아키텍처

LogGenerator (log_generator.py)
       ↓
    run.py  ←── make_log()     : 로컬 테스트용 (N건 반복 생성 + 콘솔 출력)
       │
       └──── make_one_log()    : 1건 생성 + JSON 문자열 반환
                  ↓
        adf_direct_data_put.py
                  ↓
        Firehose (put_record)
                  ↓
              S3 버킷

log_generator.py → run.py → adf_direct_data_put.py 순서로 import 체인이 이어짐. run.py는 로컬 테스트와 Firehose 전송 양쪽 모두에서 사용되는 중간 계층 역할.


Part 1. run.py — 로그 생성 실행기


1. 모듈 임포트

import json
import time
from log_generator import LogGenerator
  • json.dumps() — dict를 JSON 문자열로 직렬화(객체직렬화). ensure_ascii=False로 한글이 이스케이프 없이 그대로 출력됨
  • time.sleep() — 로그 발생 간격을 시뮬레이션하는 대기 용도
  • LogGenerator — log_generator.py에서 임포트. 도메인별 로그 생성 메서드를 제공하는 클래스

2. make_log() — 로컬 테스트용 다건 로그 생성

def make_log(config):
    log_gen = LogGenerator()
    log_gen_map = {
        "finance": log_gen.finance,
        "factory": log_gen.factory,
    }

    print(f'{config["target_industry"]} 로그 생성 시작')
    print('-' * 50)
    for i in range(config['total_count']):
        cur_func = log_gen_map.get(config['target_industry'])
        log = cur_func()
        log_json = json.dumps(log, ensure_ascii=False)
        print(f'[Log-{i+1}] {log_json}')
        time.sleep(log_gen.get_interval_time(config['mode'], config['interval']))
    print('-' * 50)
  • log_gen_map — 문자열 키와 메서드 참조를 매핑하는 딕셔너리. config['target_industry'] 값으로 실행할 메서드를 동적으로 결정함. 함수 주소값(참조)을 저장하므로 ()없이 세팅하고, 호출 시 cur_func()로 실행
  • config['total_count'] — 생성할 로그 건수만큼 반복
  • json.dumps(log, ensure_ascii=False) — dict → str 직렬화. ensure_ascii=False를 지정하지 않으면 한글이 \uXXXX 형태로 이스케이프됨
  • time.sleep() — get_interval_time()의 반환값만큼 대기. fixed 모드면 고정 간격, random 모드면 무작위 간격
  • log_gen_map에 현재 finance, factory만 등록됨. 나머지 도메인은 추후 추가 예정

3. make_one_log() — Firehose 전송용 단건 로그 생성

log_gen_g = LogGenerator()

def make_one_log():
    return json.dumps(log_gen_g.finance(), ensure_ascii=False)
  • log_gen_g — 모듈 레벨에서 생성된 전역 인스턴스. make_one_log()이 호출될 때마다 새 인스턴스를 만들지 않고 재사용함
  • make_log()와의 차이: make_log()는 N건을 반복 생성하며 콘솔 출력하는 로컬 테스트용이고, make_one_log()는 1건을 JSON 문자열로 반환하여 외부 시스템(Firehose)에 전달하는 용도
  • 현재 finance() 고정 호출. config 기반으로 도메인을 선택하려면 log_gen_map 패턴을 적용하는 확장이 필요

4. __main__ 블록 — 로컬 실행

if __name__ == '__main__':
    config = {
        "target_industry": "finance",
        "mode": "random",
        "interval": 1,
        "total_count": 50,
        "loop": False
    }
    make_log(config)
  • target_industry — 생성할 도메인 지정. log_gen_map의 키와 매칭됨
  • mode — "random" 또는 "fixed". 로그 발생 간격 방식
  • interval — 초 단위 기본 간격. random 모드에서는 0.1 ~ interval*2 범위의 무작위 간격이 됨
  • total_count — 생성할 로그 총 건수
  • loop — 무한 생성 옵션. 현재 미구현, 추후 수정 예정
  • if __name__ == '__main__': — 직접 실행 시에만 make_log()가 호출됨. adf_direct_data_put.py에서 from run import make_one_log로 임포트할 때는 이 블록이 실행되지 않음

Part 2. adf_direct_data_put.py — Amazon Data Firehose 전송


1. 모듈 임포트

import boto3
import json
import time
  • boto3 — AWS SDK for Python. AWS 서비스와 프로그래밍 방식으로 통신하기 위한 라이브러리. Airflow 환경에서 apache-airflow-providers-amazon 설치 시 자동으로 함께 설치됨

2. 환경변수

ACCESS_KEY = ''
SECRET_KEY = ''
REGION     = 'ap-northeast-2'
  • ACCESS_KEY / SECRET_KEY — AWS 외부에서 접속할 때 사용하는 IAM 인증 정보. 현재 비어있음 → AWS 내부(CloudShell 등)에서 실행하므로 불필요 -> 이걸 만약 로컬에서 저장해서 Git에 잘못 올리면 큰일나기 때문에 저장하지 않는 편이 좋음.
  • REGION — ap-northeast-2는 서울 리전

3. Firehose 클라이언트 생성 — get_client()

def get_client(service_name='firehose', is_in_aws=True):
    if not is_in_aws:
        session = boto3.Session(
            aws_access_key_id=ACCESS_KEY,
            aws_secret_access_key=SECRET_KEY,
            region_name=REGION
        )
        return session.client(service_name)
    return boto3.client(service_name, region_name=REGION)

firehose = get_client()
  • is_in_aws=True (기본값) — AWS 내부 실행. IAM 역할(Role)이 자동으로 인증을 처리하므로 키가 불필요. boto3.client()만으로 접속 가능
  • is_in_aws=False — AWS 외부 실행. boto3.Session()에 ACCESS_KEY/SECRET_KEY를 명시적으로 전달하여 인증함
  • service_name='firehose' — Amazon Data Firehose 서비스 클라이언트를 생성. 다른 서비스(s3, kinesis 등)로 변경 가능

AWS 내부 vs 외부 인증 흐름:

항목  AWS 내부 (CloudShell, EC2 등) AWS 외부 (로컬 PC 등)
인증 방식 IAM 역할 자동 ACCESS_KEY + SECRET_KEY
boto3 호출 boto3.client() boto3.Session().client()
키 필요 여부 불필요 필요

4. 로그 생성 함수 임포트

from run import make_one_log
  • run.py의 make_one_log() 함수를 임포트. 1건의 금융 로그를 JSON 문자열로 반환하는 함수
  • 임포트 시 run.py의 모듈 레벨 코드(log_gen_g = LogGenerator())가 실행되어 전역 인스턴스가 생성됨
  • if __name__ == '__main__': 블록 안의 make_log(config)는 실행되지 않음

5. Firehose 전송 함수 — send_log()

def send_log():
    response = firehose.put_record(
        DeliveryStreamName='de-ai-04-an2-kdf-log-to-s3',
        Record={
            'Data': make_one_log() + "\n"
        }
    )
    print(f'전송 결과 : {response}')
  • firehose.put_record() — Firehose 스트림에 단일 레코드를 전송하는 API
  • DeliveryStreamName — 전송 대상 Firehose 스트림 이름. 네이밍: {팀}-{리전약어}-{서비스약어}-{용도}
  • Record.Data — 전송할 데이터. make_one_log()의 JSON 문자열 + "\n". 개행 문자를 붙여서 S3에 적재될 때 레코드 단위로 구분됨
  • response — API 응답. HTTP 상태코드 200이면 전송 성공

Firehose put_record 흐름:

make_one_log() → JSON 문자열 + "\n" → put_record() → Firehose 스트림 → S3 버킷

6. 실행부 — 10건 전송

for i in range(10):
    send_log()
  • 10회 반복하여 로그 1건씩 Firehose에 전송
  • 대기 시간(sleep) 없이 연속 전송. Firehose가 내부적으로 버퍼링하여 S3에 배치 적재
  • Firehose의 기본 버퍼 설정(간격 60초 또는 용량 5MB 중 먼저 도달하는 조건)에 따라 S3에 실제로 파일이 생성되는 시점이 결정

전체 흐름 요약 — 3개 파일의 관계

log_generator.py          run.py                adf_direct_data_put.py
┌──────────────┐    ┌─────────────────┐    ┌──────────────────────┐
│ LogGenerator │◄───│ make_log()      │    │ get_client()         │
│  .finance()  │    │  (로컬 N건 테스트) │    │  → firehose 클라이언트 │
│  .factory()  │    │                 │    │                      │
│  .iot()      │    │ make_one_log()  │◄───│ send_log()           │
│  .ott()      │    │  (1건 JSON 반환)  │    │  → put_record()     │
│  .lol_game() │    │                 │    │  → S3 적재           │
└──────────────┘    └─────────────────┘    └──────────────────────┘

 

파일  역할  핵심 함수 실행 환경
log_generator.py 도메인별 로그
데이터 생성
finance(), factory() 등 6개 메서드 공통
run.py 로그 생성 실행 +
외부 인터페이스 제공
make_log() (로컬), make_one_log() (Firehose용) 로컬 / CloudShell
adf_direct_data_put.py AWS Firehose로
로그 스트리밍 전송
send_log() → put_record() CloudShell (AWS 내부)

LogGenerator가 로그를 생성하고, run.py가 로컬 테스트와 Firehose 전송 두 가지 인터페이스를 제공하며, adf_direct_data_put.py가 boto3를 통해 Firehose에 put_record로 전송하여 최종적으로 S3에 적재하는 3계층 파이프라인 구조