Airflow 소개
프로그래머스 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python을 듣고 정리한 내용입니다.
Spark / Athena 사용 시나리오
- 비구조화된 데이터 처리
- 매우 큰 비구조화된 데이터 → S3 → Spark, Athena를 통해 정제하고 크기를 줄이고 → Redshift
- Redshift 비싸기 때문에 비구조화된 데이터를 바로 올릴 이유가 없음
- ML 모델의 입력으로 들어가는 feature를 배치로 미리 계산하는 경우
- S3나 Redshift에 있는 데이터를 Spark로 처리 → NoSQL, MongoDB 등에 저장 → Reco API → ML 모델
데이터 파이프라인이란?
- ETL
- Extract, Transform, Load
- = DAG (Directed Acyclic Graph) in Airflow
- ETL: 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
- ELT: 데이터 웨어하우스 내부 데이터를 조작해서 (좀더 추상화되고 요약된) 데이터를 만드는 것
- 데이터 레이크를 사용하기도 함
- 프로세스 전용 기술들이 있음 - DBT
- 데이터 레이크
- 싸고 큰 데이터를 부담없이 저장
- structured + unstructured
- historical data storage
- 데이터 웨어하우스
- 보다 정제되고 구조화된 데이터 + retention policy
- BI tools (Tableau, Superset 등) 과 연결
데이터 소스 → 데이터 레이크(S3) → 데이터 변형(Spark, Athena) → 데이터 웨어하우스 / 데이터마트
Data Pipeline의 정의
- 데이터를 소스로부터 목적지로 복사하는 작업
- 대부분의 경우 목적지는 데이터 웨어하우스가 됨 / 또는 캐시 시스템, 프로덕션 데이터베이스, NoSQL, S3, …
- 데이터 소스: click stream, call data, transactions, metadata,
- Raw data ETL
- 외부와 내부 데이터 소스에서 데이터를 읽어다가(많은 경우 API)
- 적당한 데이터 포맷 변환 후 (크기가 커지면 Spark 등이 필요)
- 데이터 웨어하우스 로드 *이 작업은 보통 데이터 엔지니어가 함
- Summary/Report Jobs
- DW or DL로부터 데이터를 읽어 다시 DW에 쓰는 ETL
- Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
- 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
- 요약 테이블의 경우 SQL(CTAS를 통해)만으로 만들고 이는 데이터분석가가 하는 것이 맞음. 데이터 엔지니어 관점에서는 어떻게 분석가들이 편하게 할 수 있는 환경을 만들어 주느냐가 관건
- 요즘은 DBT를 많이 씀 (Analytics Engineer)
- Production Data Jobs
- DW로부터 데이터를 읽어 다른 Storage(많은 경우 프로덕션 환경)로 쓰는 ETL
- 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 혹은 머신러닝 모델에서 필요한 피쳐를 미리 계산해두는 경우
- 이 경우 흔한 타겟 스토리지
- NoSQL(HBase/DynamoDB/Cassandra)
- MySQL과 같은 관계형 데이터베이스(OLTP)
- 캐시(Redis/Memcache)
- 검색엔진(Elastic Search)
- DW로부터 데이터를 읽어 다른 Storage(많은 경우 프로덕션 환경)로 쓰는 ETL
간단한 ETL 작성해보기
- S3에서 읽어온 데이터를 Reshift에 저장해보자
1
2
3
4
5
6
7
%sql postgresql://keeyong:****@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
DROP TABLE IF EXISTS keeyong.name_gender;
CREATE TABLE keeyong.name_gender (
name varchar(32),
gender varchar(8)
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "keeyong"
redshift_pass = "Keeyong1!"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
- ETL 함수를 하나씩 정의
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import requests
#읽어오기(s3)
def extract(url):
f = requests.get(url)
return (f.text)
#리스트로 나누기
def transform(text):
lines = text.split("\n")
return lines
#읽어온 데이터를 한줄씩 좀전에 만든 테이블에 insert
def load(lines):
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
# BEGIN;DELETE FROM (본인의스키마).name_gender;INSERT INTO TABLE VALUES ('KEEYONG', 'MALE');....;END;
cur = get_Redshift_connection()
for r in lines:
if r != '':
(name, gender) = r.split(",")
print(name, "-", gender)
sql = "INSERT INTO keeyong.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
print(sql)
cur.execute(sql)
- ETL 함수를 순서대로 실행
1
2
3
4
5
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
Airflow 소개
- 데이터 파이프라인 프레임워크(파이썬) + 스케줄링 with Web UI
- 데이터 파이프라인 = DAG
- Task = Operator로 구성됨
- 1개 이상의 서버 = 워커, 스케줄러
- 스케줄러가 워커에서 태스크를 분배
- DAG와 스케줄링 정보는 DB에 저장 (SQLite이 default)
- 20년에 2.0이 나왔고 지금은 대부분 2.0을 씀
- 오픈소스이므로 가장 최신버전을 쓴다면 안정성에서 위험부담이 있음 → 구글 클라우드가 뭘 쓰는지 확인 (클라우드 회사는 아무거나 안 씀.. 2.4 쓴다면 안전할 것이다)
Airflow 구성
- 총 5개
- Web Server
- Scheduler
- Worker
- Database (위 정보를 저장하는 metadata DB)
- Queue (멀티노드 구성인 경우)
- 이 경우 Executor가 달라짐(CeleryExecuter, KubernetesExecutor)
- 서버 한대로 부족할 경우 옵션: 스케일 업(더 좋은 사양의 서버 사용) or 스케일 아웃 (서버 추가)
- 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어
- 다양한 데이터소스와 데이터 웨어하우스 지원
- 백필이 쉬움
- 단점
- 배우기 쉽지 않음
- 상대적으로 개발환경 구성이 쉽지 않음
- 직접 운영이 쉽지 않음, 클라우드 버전 사용이 선호됨
- GCP - Cloud Composer
- AWS - Managed Workflows for Apache Airflow
- 장점
- DAG란
- Airflow에서 ETL을 부르는 명칭
- Task로 구성됨 ex. 3개의 태스크로 구성된다면 Extract, Transform, Load
- 태스크는 Airflow의 오퍼레이터로 만들어짐
- 이미 다양한 종류의 오퍼레이터를 제공하므로 경우에 맞게 사용하거나 필요하다면 직접 개발
- ex. Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script
- 모든 태스크에 필요한 기본 정보
- default_args = 딕셔너리
- owner, start_date, end_date, email(에러가 날 경우 수신), retries(실패할 경우 몇번까지 시도), retry_delay(재시도 할때 기다리는 시간)
test_dag = DAG(“dag_v1”, schedule_interval = “0 9 * * *”, tags: [‘tag’], default_args = default_args)
- schedule_interval은 크론탭 문법을 따름
- 순서대로
분, 시간, 일, 월, 주
- 0 * * * * : 매시 0분에 시행됨
- 0 12 * * * : 매일 12시 0분에 시행됨
- 순서대로
- default_args = 딕셔너리
데이터 파이프라인을 만들 때 고려할 점
- 데이터 파이프라인은 많은 이유로 실패함 (이상과 현실 간의 괴리)
- 버그
- 데이터 소스 상의 이슈
- 파이프라인들 간의 의존도에 대한 이해도 부족
- 데이터 파이프라인 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
- 데이터 소스 간의 의존도가 생기면서 더 복잡해짐
- 중요한 정보가 업데이트가 안된다면 관련 다른 모든 정보들이 갱신되지 않는다든지
- best case를 가정하고 파이프라인을 만들면 안 됨 - 11시에는 a가 다 잘 돌 거니까 그게 필요한 b도 잘 실행 될 거야 → 안됨…
- 더 많은 테이블들이 관리가 되어야 함
- 데이터 소스 간의 의존도가 생기면서 더 복잡해짐
- Best Practices
- 가능하면 데이터가 작을 경우 매번 통째로 복사해서 테이블 만들기 (Full Refresh)
- Incremental update만이 가능하다면, 대신 데이터소스가 갖춰야 할 몇 가지 조건이 있음
- 데이터소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 반드시 필요
- created, modified, deleted
- 데이터소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 함
- 데이터소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 반드시 필요
- Incremental update만이 가능하다면, 대신 데이터소스가 갖춰야 할 몇 가지 조건이 있음
- 멱등성(Idempotency)을 보장하는 것이 중요
- 동일한 입력 데이터로 데이터 파이프라인을 다수 반복해도 최종 결과물이 달라지지 않아야 함
- 예를 들면 중복 데이터가 생기면 안 됨
- 실패한 데이터파이프라인을 재실행하는 것과 과거 데이터를 다시 채우는 과정(backfill)이 쉬워야 함
- Airflow는 이런 부분(특히 backfill)에 강점을 가지고 있음
- DAG의 catchup 파라미터가 True여야 하고, start_date, end_date이 적절하게 설정되어야 함
- 대상 테이블이 incremental update가 되는 경우만 의미가 있음
- execution_date 파라미터를 사용해서 업데이트되는 날짜 혹은 시간을 알아내게 코드를 작성해야 함
- 현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티 패턴
- Airflow는 이런 부분(특히 backfill)에 강점을 가지고 있음
- 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
- 데이터 디스커버리 문제!
- 주기적으로 쓸모없는 데이터들을 삭제 (규모가 커질수록 불필요한 데이터 파이프라인으로 생기는 이슈가 많음. 다 돈이 됨)
- 데이터 파이프라인 사고시마다 리포트(post-mortem) 쓰기
- 중요 데이터 파이프라인의 입력과 출력을 체크하기
- 입력/출력 레코드 수 체크, 중복 레코드 체크, Primary key uniqueness 체크
- 가능하면 데이터가 작을 경우 매번 통째로 복사해서 테이블 만들기 (Full Refresh)
Airflow의 Backfill 방식 설명
- Daily incremental update를 구현한다면?
- 예를 들어 20/11/7 데이터부터 매일매일 하루치 데이터를 읽어온다고 가정해보자.
- 이 경우 언제부터 해당 ETL이 동작해야 하나? → 20/11/8
- 20/11/8에 동작하고, 20/11/7에 데이터를 읽어오는 것으로 시작해서 매일매일 업데이트
- 그렇다면 Airflow의 start_date은 11/7임! 처음 DAG가 실행되는 날짜가 아님
- execution_date라는 시스템 변수에 start_date을 넘겨서 11/8부터 실행되도록 함
- Incremental하게 1년치 데이터를 backfill 해야 한다면? 어떻게 ETL을 구현해야 할까?
- 해결방법1
- 기존 ETL 코드(지금 시간 기준으로 날짜를 정하도록)를 조금 수정해서 지난 1년치 데이터에 대해 돌린다
- 실수하기 쉽고 수정하는데 시간이 걸림
- 해결방법2
- 시스템적으로 이걸 쉽게 해주는 방법을 구현한다
- 읽어와야 하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜에 해당하는 데이터를 다시 읽어온다
- Airflow의 접근방식
- 모든 DAG 실행에는 execution_date이 지정되어 있음
- execution_date으로 채워야하는 날짜와 시간이 넘어오고 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함 → 잇점: backfill이 쉬워짐
- 해결방법1
- 만불짜리 눈물의 쿼리
- catchup 파라미터 → 8/6인 start_date인 DAG을 14일에 뒤늦게 enable했을 때 catchup = True인 경우 놓쳤던 것들을 8번 실행하게 됨
- full refresh로 할 경우 catchup은 무조건 false
- incremental update로 할 경우 복잡도가 올라갈 수밖에 없기 때문에 할 수 있을 때까지는 full refresh 하는 게 좋음
간단한 Airflow 잡 실행하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule_interval = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
How to Trigger a DAG - 터미널에서 실행
- 먼저 SSH로 Airflow 서버에 로그인하고 airflow 사용자로 변경
- airflow dags list
- airflow tasks list DAG이름
- airflow tasks test DAG이름 Task이름 날짜 # test vs. run
- 날짜는 YYYY-MM-DD
- start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
- 이게 바로 execution_date의 값이 됨
- start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
- 날짜는 YYYY-MM-DD
Airflow Operators, Variables and Connections
- task level에서 적용되는 파라미터 = default_args 내 값들
- on_failure_callback : 실패시 이 함수 불러라
- retires : 몇번 재시도 할건지, retry_delay
- DAG Object creation parameter
- 이름, 태그
- 스케줄 안하고 앞의 게 실행되면 하는 경우 스케줄이 아닌 None, @once
- DAG 파라미터
- catchup
- max_active_runs
- max_active_tasks
- 한번에 몇개씩
- DAG 파라미터와 task 파라미터 이해
- DAG 파라미터는 DAG 객체를 만들 때 지정해줘야 함
- default_args로 지정해주면 에러는 안 나지만 적용이 안 됨
- Python Operator는 매우 자유분방하게 내부에 python_callable을 통해 원하는 대로 파이썬 함수를 넣어줄 수 있고 Airflow에서 넘겨주는 execution_date를 함수에서 받아서 사용할 수 있음
Name Gender DAG 개선하기
- Airflow Variable에 key value 저장
- 민감한 정보들을 코드 밖으로 빼내기
- Variable이라는 모듈 import해서 사용
- xcom → 앞의 태스크의 리턴값을 받아오는것 / return_value라는 키로 저장
- Connection
- 인증정보 e.g. Redshift Connection
This post is licensed under CC BY 4.0 by the author.