본문 바로가기
DEV/Airflow

DAG 예시 및 DAG 반영 참고

by 화트마 2023. 4. 17.

DAG 수정 사항 반영 방법

google composer 기준 가이드 참고한거라 실제 airflow랑 같을지 모르겠음.

  • DAG 추가 / 업데이트 후 Airflow가 DAG를 로드하고 업데이트 하는 데 시간이 걸림.
  • 클러스터일 경우 Airflow UI에 DAG가 업데이트 되었어도, 실제로 모든 worker에 DAG가 업데이트 되지 않았을 수도 있음. 충분한 대기 시간 필요.
  • DAG의 start_date나, schedule이 바뀌는 등 큰 수정이 있을 때 스케쥴이 제대로 동작하지 않을 수 있음. 또한, 기존 task를 삭제하거나 이름을 수정할 경우 실행 이력이 날라감.
    ⇒ 따라서 Airflow에서는 DAG 에 큰 수정이 있을경우 버전관리를 하는 것을 추천함. (DAG_v1, DAG_v2…)

 

작업 예시

  1. 현재 실행중인 DAG 업데이트
    • 현재 실행중인 TASK는 원래 DAG파일을 사용해 완료됨.
    • 예약되었지만 아직 실행되지 않는 모든 태스크는 업데이트된 DAG 파일을 사용함.
    • 업데이트 된 DAG 파일에 더 이상 없는 태스크는 삭제된 것으로 표시됨
  2. 빈번한 일정으로 실행되는 DAG 업데이트
    • Airflow UI 에서 DAG 일시중지
    • DAG 파일 업로드
    • Airflow UI에 업데이트 표시될 때까지 기다림.
    • (주의, UI에 업데이트 되어도 모든 worker에 업데이트된 DAG가 반영되었다고 볼 수는 없음)
💡 안전한 DAG 업데이트를 위해 2번 방식인 DAG 중지 후 작업 추천.

 

DAG 예시

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 한국 시간 timezone 설정
local_tz = pendulum.timezone("Asia/Seoul")

default_args = {
    'owner': 'cuinv',
    'depends_on_past': False,
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 0,
  'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='sample_dag_v0.0.1',
    default_args=default_args,
    schedule='1 * * * *',
    start_date=datetime(2023, 2, 15, tzinfo=local_tz),
    catchup=False
)

start_task = BashOperator(
    task_id='start_task',
    bash_command="echo 1",
    dag=dag
)

echo_task = BashOperator(
    task_id="echo_task",
    bash_command="echo 2",
	dag=dag
)

end_task = BashOperator(
    task_id='end_task',
    bash_command="echo 3",
    dag=dag
)

# 태스크 의존성 연결
start_task >> echo_task >> end_task

 

python 실행 테스트

python 명령으로 DAG 정의 파일을 실행해본다.

python test_dags.py

발생한 예외가 없으면 정상적인 상태이다.

 

DAG 인식

Airflow에서 DAG를 정상적으로 인식하는지 확인한다.

airflow dags list

 

Task 인식

Airflow에서 DAG의 Task를 정상적으로 인식하는지 확인한다.

airflow tasks list DAG_ID

 

DAG 필수 파라미터

dag_id DAG의 고유 식별자로 사용됩니다.

default_args DAG에 대한 기본 인수를 정의합니다. 예를 들어, DAG의 start_date, owner, retries 등을 설정할 수 있습니다.
schedule_interval DAG의 실행 스케줄을 정의합니다. 예를 들어, '@daily', '@hourly', '0 0 * * *' 등과 같이 설정할 수 있습니다.
start_date DAG의 시작 시간을 정의합니다.
catchup DAG가 이전 실행을 추적할지 여부를 결정합니다. True로 설정하면 이전 실행을 추적합니다.
description DAG의 간단한 설명을 제공합니다.

 

DAG 선택 파라미터

owner DAG를 생성한 사람이나 팀의 이름이나 식별자를 나타내는 파라미터. Airflow UI에서 DAG를 검색할 때 유용하게 사용됩니다. 또한, 이메일 알림 등에서 DAG를 식별할 때도 사용됩니다.
max_active_runs DAG에서 동시에 실행할 수 있는 최대 활성 실행 수를 제한합니다. 기본값은 16입니다.
concurrency DAG에서 동시에 실행할 수 있는 최대 태스크 수를 제한합니다. 기본값은 16입니다.
catchup_by_default 새 DAG 생성시, 기본값으로 catchup이 활성화될지 여부를 설정합니다. 기본값은 True입니다.
sla DAG에서 Service Level Agreement (SLA)를 정의합니다. 이를 통해 태스크의 실행 시간 제한 등을 설정할 수 있습니다.
on_failure_callback DAG 또는 태스크 실행 실패 시 호출되는 콜백 함수를 지정합니다.
on_success_callback DAG 또는 태스크 실행 성공 시 호출되는 콜백 함수를 지정합니다.
end_date DAG의 종료 시간을 정의합니다.
schedule_interval DAG의 실행 주기를 설정합니다. start_date와 함께 사용됩니다.
retries 실패한 태스크를 다시 시도하는 횟수를 설정합니다.
retry_delay 실패한 태스크를 다시 시도하기 전 대기 시간을 설정합니다.
email DAG와 관련된 이메일을 보내는데 사용됩니다. 예를 들어, DAG 실패 시 알림 이메일을 보낼 수 있습니다.
email_on_failure DAG 실패 시 이메일 알림을 보낼지 여부를 결정합니다.
email_on_retry 태스크 재시도 시 이메일 알림을 보낼지 여부를 결정합니다.
trigger_rule 다른 태스크가 완료될 때 실행할지, 실패할 때 실행할지 등 실행 시점을 결정합니다.
dagrun_timeout DAG 실행 시간 제한을 설정하는 데 사용됩니다
depends_on_past 이전 DAG 실행이 성공했는지 여부에 따라 현재 DAG 실행을 조건부로 설정합니다.
default_view DAG의 기본 보기 모드를 지정합니다. 'tree', 'graph', 'duration', 'gantt' 등의 값을 가질 수 있습니다.
orientation DAG 그래프를 그릴 때 노드와 엣지의 방향을 결정합니다. 'TB' (상향), 'LR' (좌우) 등의 값을 가질 수 있습니다.
params DAG 내에서 사용할 수 있는 일반적인 파라미터를 정의합니다. 이 파라미터는 Python 사전 형식으로 전달됩니다.
pool DAG에서 사용할 수 있는 실행자 풀을 지정합니다. 실행자 풀은 동시에 실행 가능한 작업자 수를 제한하는 데 사용됩니다.
priority_weight DAG의 실행 우선순위를 지정합니다. 높은 우선순위는 다른 DAG와 비교했을 때 먼저 실행되도록 보장합니다.
queue DAG에서 사용할 수 있는 실행 큐를 지정합니다. 실행 큐는 실행자 풀과 유사한 역할을 합니다.

 

'DEV > Airflow' 카테고리의 다른 글

기타 필수 개념  (0) 2023.04.17
Airflow 소개  (1) 2023.04.17

댓글