run.py + adf_direct_data_put.py 코드 분석 — 로그 생성부터 Firehose 전송까지
LogGenerator로 로그를 생성하고, Amazon Data Firehose를 통해 S3로 스트리밍하는 파이프라인 구조
전체 아키텍처
LogGenerator (log_generator.py)
↓
run.py ←── make_log() : 로컬 테스트용 (N건 반복 생성 + 콘솔 출력)
│
└──── make_one_log() : 1건 생성 + JSON 문자열 반환
↓
adf_direct_data_put.py
↓
Firehose (put_record)
↓
S3 버킷
log_generator.py → run.py → adf_direct_data_put.py 순서로 import 체인이 이어짐. run.py는 로컬 테스트와 Firehose 전송 양쪽 모두에서 사용되는 중간 계층 역할.
Part 1. run.py — 로그 생성 실행기
1. 모듈 임포트
import json
import time
from log_generator import LogGenerator
- json.dumps() — dict를 JSON 문자열로 직렬화(객체직렬화). ensure_ascii=False로 한글이 이스케이프 없이 그대로 출력됨
- time.sleep() — 로그 발생 간격을 시뮬레이션하는 대기 용도
- LogGenerator — log_generator.py에서 임포트. 도메인별 로그 생성 메서드를 제공하는 클래스
2. make_log() — 로컬 테스트용 다건 로그 생성
def make_log(config):
log_gen = LogGenerator()
log_gen_map = {
"finance": log_gen.finance,
"factory": log_gen.factory,
}
print(f'{config["target_industry"]} 로그 생성 시작')
print('-' * 50)
for i in range(config['total_count']):
cur_func = log_gen_map.get(config['target_industry'])
log = cur_func()
log_json = json.dumps(log, ensure_ascii=False)
print(f'[Log-{i+1}] {log_json}')
time.sleep(log_gen.get_interval_time(config['mode'], config['interval']))
print('-' * 50)
- log_gen_map — 문자열 키와 메서드 참조를 매핑하는 딕셔너리. config['target_industry'] 값으로 실행할 메서드를 동적으로 결정함. 함수 주소값(참조)을 저장하므로 ()없이 세팅하고, 호출 시 cur_func()로 실행
- config['total_count'] — 생성할 로그 건수만큼 반복
- json.dumps(log, ensure_ascii=False) — dict → str 직렬화. ensure_ascii=False를 지정하지 않으면 한글이 \uXXXX 형태로 이스케이프됨
- time.sleep() — get_interval_time()의 반환값만큼 대기. fixed 모드면 고정 간격, random 모드면 무작위 간격
- log_gen_map에 현재 finance, factory만 등록됨. 나머지 도메인은 추후 추가 예정
3. make_one_log() — Firehose 전송용 단건 로그 생성
log_gen_g = LogGenerator()
def make_one_log():
return json.dumps(log_gen_g.finance(), ensure_ascii=False)
- log_gen_g — 모듈 레벨에서 생성된 전역 인스턴스. make_one_log()이 호출될 때마다 새 인스턴스를 만들지 않고 재사용함
- make_log()와의 차이: make_log()는 N건을 반복 생성하며 콘솔 출력하는 로컬 테스트용이고, make_one_log()는 1건을 JSON 문자열로 반환하여 외부 시스템(Firehose)에 전달하는 용도
- 현재 finance() 고정 호출. config 기반으로 도메인을 선택하려면 log_gen_map 패턴을 적용하는 확장이 필요
4. __main__ 블록 — 로컬 실행
if __name__ == '__main__':
config = {
"target_industry": "finance",
"mode": "random",
"interval": 1,
"total_count": 50,
"loop": False
}
make_log(config)
- target_industry — 생성할 도메인 지정. log_gen_map의 키와 매칭됨
- mode — "random" 또는 "fixed". 로그 발생 간격 방식
- interval — 초 단위 기본 간격. random 모드에서는 0.1 ~ interval*2 범위의 무작위 간격이 됨
- total_count — 생성할 로그 총 건수
- loop — 무한 생성 옵션. 현재 미구현, 추후 수정 예정
- if __name__ == '__main__': — 직접 실행 시에만 make_log()가 호출됨. adf_direct_data_put.py에서 from run import make_one_log로 임포트할 때는 이 블록이 실행되지 않음
Part 2. adf_direct_data_put.py — Amazon Data Firehose 전송
1. 모듈 임포트
import boto3
import json
import time
- boto3 — AWS SDK for Python. AWS 서비스와 프로그래밍 방식으로 통신하기 위한 라이브러리. Airflow 환경에서 apache-airflow-providers-amazon 설치 시 자동으로 함께 설치됨
2. 환경변수
ACCESS_KEY = ''
SECRET_KEY = ''
REGION = 'ap-northeast-2'
- ACCESS_KEY / SECRET_KEY — AWS 외부에서 접속할 때 사용하는 IAM 인증 정보. 현재 비어있음 → AWS 내부(CloudShell 등)에서 실행하므로 불필요 -> 이걸 만약 로컬에서 저장해서 Git에 잘못 올리면 큰일나기 때문에 저장하지 않는 편이 좋음.
- REGION — ap-northeast-2는 서울 리전
3. Firehose 클라이언트 생성 — get_client()
def get_client(service_name='firehose', is_in_aws=True):
if not is_in_aws:
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name=REGION
)
return session.client(service_name)
return boto3.client(service_name, region_name=REGION)
firehose = get_client()
- is_in_aws=True (기본값) — AWS 내부 실행. IAM 역할(Role)이 자동으로 인증을 처리하므로 키가 불필요. boto3.client()만으로 접속 가능
- is_in_aws=False — AWS 외부 실행. boto3.Session()에 ACCESS_KEY/SECRET_KEY를 명시적으로 전달하여 인증함
- service_name='firehose' — Amazon Data Firehose 서비스 클라이언트를 생성. 다른 서비스(s3, kinesis 등)로 변경 가능
AWS 내부 vs 외부 인증 흐름:
| 항목 | AWS 내부 (CloudShell, EC2 등) | AWS 외부 (로컬 PC 등) |
| 인증 방식 | IAM 역할 자동 | ACCESS_KEY + SECRET_KEY |
| boto3 호출 | boto3.client() | boto3.Session().client() |
| 키 필요 여부 | 불필요 | 필요 |
4. 로그 생성 함수 임포트
from run import make_one_log
- run.py의 make_one_log() 함수를 임포트. 1건의 금융 로그를 JSON 문자열로 반환하는 함수
- 임포트 시 run.py의 모듈 레벨 코드(log_gen_g = LogGenerator())가 실행되어 전역 인스턴스가 생성됨
- if __name__ == '__main__': 블록 안의 make_log(config)는 실행되지 않음
5. Firehose 전송 함수 — send_log()
def send_log():
response = firehose.put_record(
DeliveryStreamName='de-ai-04-an2-kdf-log-to-s3',
Record={
'Data': make_one_log() + "\n"
}
)
print(f'전송 결과 : {response}')
- firehose.put_record() — Firehose 스트림에 단일 레코드를 전송하는 API
- DeliveryStreamName — 전송 대상 Firehose 스트림 이름. 네이밍: {팀}-{리전약어}-{서비스약어}-{용도}
- Record.Data — 전송할 데이터. make_one_log()의 JSON 문자열 + "\n". 개행 문자를 붙여서 S3에 적재될 때 레코드 단위로 구분됨
- response — API 응답. HTTP 상태코드 200이면 전송 성공
Firehose put_record 흐름:
make_one_log() → JSON 문자열 + "\n" → put_record() → Firehose 스트림 → S3 버킷
6. 실행부 — 10건 전송
for i in range(10):
send_log()
- 10회 반복하여 로그 1건씩 Firehose에 전송
- 대기 시간(sleep) 없이 연속 전송. Firehose가 내부적으로 버퍼링하여 S3에 배치 적재함
- Firehose의 기본 버퍼 설정(간격 60초 또는 용량 5MB 중 먼저 도달하는 조건)에 따라 S3에 실제로 파일이 생성되는 시점이 결정됨
전체 흐름 요약 — 3개 파일의 관계
log_generator.py run.py adf_direct_data_put.py
┌──────────────┐ ┌─────────────────┐ ┌──────────────────────┐
│ LogGenerator │◄───│ make_log() │ │ get_client() │
│ .finance() │ │ (로컬 N건 테스트) │ │ → firehose 클라이언트 │
│ .factory() │ │ │ │ │
│ .iot() │ │ make_one_log() │◄───│ send_log() │
│ .ott() │ │ (1건 JSON 반환) │ │ → put_record() │
│ .lol_game() │ │ │ │ → S3 적재 │
└──────────────┘ └─────────────────┘ └──────────────────────┘
| 파일 | 역할 | 핵심 함수 | 실행 환경 |
| log_generator.py | 도메인별 로그 데이터 생성 |
finance(), factory() 등 6개 메서드 | 공통 |
| run.py | 로그 생성 실행 + 외부 인터페이스 제공 |
make_log() (로컬), make_one_log() (Firehose용) | 로컬 / CloudShell |
| adf_direct_data_put.py | AWS Firehose로 로그 스트리밍 전송 |
send_log() → put_record() | CloudShell (AWS 내부) |
LogGenerator가 로그를 생성하고, run.py가 로컬 테스트와 Firehose 전송 두 가지 인터페이스를 제공하며, adf_direct_data_put.py가 boto3를 통해 Firehose에 put_record로 전송하여 최종적으로 S3에 적재하는 3계층 파이프라인 구조
'SK플래닛 ai활용 데이터엔지니어 과정 2기 > Airflow' 카테고리의 다른 글
| Kinesis - 2(KDS+Flink+Firehose+S3) (0) | 2026.04.16 |
|---|---|
| Kinesis - 1 (KDS+Firehose+S3) (0) | 2026.04.16 |
| Firehose - 1 (LogGenerator로 도메인별 로그 생성) (1) | 2026.04.15 |
| Airflow - 9 (이벤트 기반 파이프라인) (0) | 2026.04.15 |
| Airflow - 8 (S3와 Airflow의 연계) (0) | 2026.04.14 |