Post

Productionizing Airflow

프로그래머스 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python을 듣고 정리한 내용입니다.

Docker와 Kubernetes

  • Docker
    • Docker Image
      • 단순히 응용프로그램 뿐 아니라 그 프로그램이 필요로 하는 모든 다른 환경까지 포함한 소프트웨어 패키지
      • Docker Registry에 가면 다양한 Image들을 찾아볼 수 있음
    • Docker Container
      • Docker Image를 Docker Engine에서 실행한 것을 지칭
      • Docker Engine만 실행하면 마치 가상 컴퓨터처럼 동작을 하면서 그 위에 다양한 소프트웨어들을 충돌없이 실행 가능
  • Kubernetes (K8s)
    • 컨테이너 기반 서비스 배포/스케일/관리 자동화를 해주는 오픈소스 프레임웍
    • 가장 많이 사용되는 컨테이너 관리 (Orchestration) 시스템
    • 다수의 서버에 컨테이너 기반 프로그램을 실행하고 관리
      • 컨테이너 기반 프로그램 == Docker Container
      • 보통 Docker와 K8S는 같이 사용됨
      • Pod: 같은 디스크와 네트웍을 공유하는 1+ 컨테이너들의 집합
  • Airflow와 Kubernetes
    • DAG 수가 많아지면 Worker 노드에서 task들을 실행하는 것이 어려워짐
      • 다양한 환경 설정이 필요해지고 그들간의 충돌이 발생하게 됨
      • 또한 다수의 worker 노드들을 Airflow 전용으로 쓰는 것이 낭비가 될 수도 있음
    • 해결책: Airflow에서 K8S를 Worker 노드 대용으로 사용
      • KubernetesExecutor를 먼저 사용 (*지금까지는 LocalExecutor 썼음)
      • 먼저 Airflow task 코드를 Docker 이미지로 만듬
      • 이 이미지를 K8S 클러스터에서 실행하는데 2가지 방법이 존재
        1. KubernetesExecutor - DAG의 전체 태스크들이 하나의 Pod로 실행
        2. KubernetesPodOperator - DAG의 특정 태스크들이 각각 별개의 Pod로 실행됨

5주차 질문 리뷰

  • DBT 소개
    • Data Build Tool
      • ELT용 오픈소스
      • dbt Labs라는 회사가 상용화
      • Analytics Engineer라는 말을 만들어냄
    • 다양한 데이터 웨어하우스 지원
      • Redshift, Snowflake, Bigquery, Spark
    • 다수의 컴포넌트로 구성
      • 데이터 모델 (-> lineage 추적 가능 / 누가 사고쳤는지..), 입력 데이터, 데이터 체크, 스냅샷
    • 클라우드 버전도 존재 (dbtCloud)
  • Airflow의 Variable로 관리하는 것과 코드 내에서 관리하는 것의 장단점
    • 장점: 코드 푸시의 필요성이 없음
    • 단점: 관리나 테스트가 안되어서 사고로 이어질 가능성이 있음 (코드에 들어가는 순간 기록이 남으므로 확인 가능)
  • S3로 올린 파일은 어떻게 확인이 가능한가?
    • s3cmd를 통해 확인하거나 AWS의 S3 콘솔로 로그인해서 확인
    • s3cmd ls s3://grepp-data-engineering/
  • Airflow timezone 정리
    • start_date, end_date, schedule_interval
      • default_timezone에 지정된 타임존을 따름
    • execution_date와 로그 시간
      • UTC를 따름
      • 즉 execution_date를 사용할 때는 타임존을 고려해서 변환후 사용필요
    • 현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임
  • Data Lakehouse
    • 데이터 웨어하우스와 데이터 레이크의 장점을 취한 하이브리드 방식
      • 데이터 레이크의 장점: 비용 효율성과 스토리지 확장성 제공
      • 데이터 웨어하우스의 장점
        • ACID (Atomicity, Consistency, Isolation, Durability) 지원
        • 스키마와 같은 데이터 관리기능 제공
    • BI와 ML의 장점을 모두 취하는 것을 목표로 함
    • Databricks에서 2020년에서 처음 발표
      • Databricks Data Lakehouse Platform
      • 다음 컴포넌트로 구성됨
        • Cloud기반 데이터 레이크
        • Delta Lake: 오픈소스. 소프트웨어 레이어
        • Unity Catalog

5주차 숙제 리뷰: Build Summary 개선 (환경 설정 중심의 접근 방식)

  • 일별 NPS를 SQL로 계산
1
2
3
4
5
6
7
SELECT LEFT(created_at, 10) AS date, 
  ROUND(SUM(CASE
    WHEN score >= 9 THEN 1
    WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) 
FROM keeyong.nps
GROUP BY 1
ORDER BY 1;

Airflow Configuration for Production Usage

Things to Change

  • airflow.cfg is in /var/lib/airflow/airflow.cfg
    • Any changes here will be reflected when you restart the webserver and scheduler
      • 재시작 안 하면 바꾼 거 변경 안 됨
    • [core] 섹션의 dags_folder가 DAG들이 있는 디렉토리가 되어야함
      • /var/lib/airflow/dags
    • dag_dir_list_interval: dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시 (초 단위)
      • 300초 = 5분
      • 새로 폴더 만들어도 웹UI에 바로 뜨지 않고 이 주기만큼 기다려야 할 수 있음
  • Airflow Database upgrade (Airflow 설치때 설명)
    • Sqlite -> Postgres or MySQL (이 DB는 주기적으로 백업되어야함)
    • sql_alchemy_conn in Core section of airflow.cfg
  • LocalExecutor 사용 (Airflow 설치때 설명)
    • Executor in Core section of airflow.cfg
    • Single Server: from SequentialExecutor to LocalExecutor
      • Cluster: from SequentialExecutor to CeleryExecutor or KubernetesExecutor
  • Enable Authentication & use a strong password (보안!)
    • In Airflow 2.0, authentication is ON by default
  • Large disk volume for logs and local data
    • 많이 쓰다보면 디스크가 부족해지기 시작함 - 잘 삭제하든지 큰 스토리지로 옮기든지
    • Logs -> /dev/airflow/logs in (Core section of airflow.cfg)
      • base_log_folder
      • child_process_log_directory
    • Local data -> /dev/airflow/data
  • Periodic Log data cleanup
    • The above folders need to be cleaned up periodically
    • You can write a shell Operator based DAG for this purpose
  • From Scale Up to Scale Out
    • Go for Cloud Airflow options
      • GCP Cloud Composer
      • Amazon MWAA
  • Backup Airflow metadata database
    • Backup variables and connections (command lines or APIs)
      • airflow variables export variables.json
      • airflow connections export connections.json
  • Add health-check monitoring

API & Airflow 모니터링

  • Airflow API 활성화
    • airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
      • [api]
      • auth_backend = airflow.api.auth.backend.basic_auth
    • airflow 스케줄러 재실행
      • sudo systemctl restart airflow-webserver
    • basic_auth의 ID/Password 설정
      • Airflow 서버에서 airflow 사용자로 변경 sudo su airflow
      • airflow config get-value api auth_backend
    • Airflow Web UI에서 새로운 사용자 추가
      • Security -> List Users -> +
      • 이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)
  • Health API 호출
    • curl -X GET --user "monitor:MonitorUser1" https://[AirflowServer]:8080/health
    • 정상 경우 응답:
1
2
3
4
5
6
{
"metadatabase": { "status": "healthy"
}, "scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00" }
}

Google Spreadsheet 연동하기

Airflow 로그 파일 삭제하기

  • Airflow 로그 위치
  • 두 군데에 별도의 로그가 기록됨. 이를 주기적으로 삭제하거나 백업 (s3) 필요
1
2
3
4
5
[logging]
# The folder where airflow should store its log files # This path must be absolute
base_log_folder = /var/lib/airflow/logs
[scheduler]
child_process_log_directory = /var/lib/airflow/logs/scheduler

Airflow 메타데이터 백업하기

  • Airflow 메타데이터의 주기적인 백업
    • 이 데이터베이스가 외부에 있다면 (특히 AWS RDS라면)
      • 거기에 바로 주기적인 백업 셋업
    • Airflow와 같은 서버에 메타 데이터 DB가 있다면 (예를 들어 PostgreSQL)
      • 그러면 DAG등을 이용해 주기 백업 실행 (S3로 저장)

DAG간의 의존성 만들기

  • Two ways to do this
    • Explicit trigger
      • TriggerDagOperator
      • DAG A triggers DAG B
        • A가 끝나면 B가 시작한다는 걸 A에서 아는 경우
    • Reactive trigger
      • ExternalTaskSensor
      • DAG B waits for a task of DAG A to complete in some fashion
        • A는 모르는데 B가 알아서 A 끝나면 시작하는 경우

TriggerDagOperator

  • How to create dependencies between DAGs?
  • DAG A의 태스크를 TriggerDagRunOperator로 구현
1
2
3
4
5
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

	trigger_B = TriggerDagRunOperator( task_id="trigger_B", 
                                      trigger_dag_id="트리커하려는 DAG이름"
                                     )
1
2
3
4
5
6
7
8
9
10
11
12
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
  task_id="trigger_B",
  trigger_dag_id="트리커하려는 DAG이름",
  # DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja 템플릿(dag_run.conf["path"])으로 접근 가능. 
  # DAG B PythonOperator(**context)에서라면 kwargs['dag_run'].conf.get('conf')
  conf={ 'path': '/opt/ml/conf' },
  # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
  execution_date="",
  reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행 
  wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
  )

ExternalTaskSensor

  • 앞서와는 반대로 DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
    • 먼저 동일한 schedule_interval을 사용
    • 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨!
1
2
3
4
5
6
7
8
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor( 
  task_id='waiting_for_end_of_dag_a', 
  external_dag_id='DAG이름', 
  external_task_id='end',
  timeout=5*60,
  mode='reschedule' 
  )
  • 만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면 ?
  • 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
    • execution_delta를 사용
    • execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능
  • 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가
1
2
3
4
5
6
7
8
9
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor( 
  task_id='waiting_for_end_of_dag_a', 
  external_dag_id='DAG이름', 
  external_task_id='end',
  timeout=5*60,
  mode='reschedule', 
  execution_delta=timedelta(minutes=5)
)

BranchPythonOperator

  • 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
  • TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
1
2
3
4
5
6
7
8
9
10
11
12
13
from airflow.operators.python import BranchPythonOperator

# 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴 def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev": 
  return []
else:
  return ["trigger_b"]

  # "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵 
branching = BranchPythonOperator(
  task_id='branching',
  python_callable=skip_or_cont_trigger, 
)
This post is licensed under CC BY 4.0 by the author.