DAG 수정 사항 반영 방법
google composer 기준 가이드 참고한거라 실제 airflow랑 같을지 모르겠음.
- DAG 추가 / 업데이트 후 Airflow가 DAG를 로드하고 업데이트 하는 데 시간이 걸림.
- 클러스터일 경우 Airflow UI에 DAG가 업데이트 되었어도, 실제로 모든 worker에 DAG가 업데이트 되지 않았을 수도 있음. 충분한 대기 시간 필요.
- DAG의 start_date나, schedule이 바뀌는 등 큰 수정이 있을 때 스케쥴이 제대로 동작하지 않을 수 있음. 또한, 기존 task를 삭제하거나 이름을 수정할 경우 실행 이력이 날라감.
⇒ 따라서 Airflow에서는 DAG 에 큰 수정이 있을경우 버전관리를 하는 것을 추천함. (DAG_v1, DAG_v2…)
작업 예시
- 현재 실행중인 DAG 업데이트
- 현재 실행중인 TASK는 원래 DAG파일을 사용해 완료됨.
- 예약되었지만 아직 실행되지 않는 모든 태스크는 업데이트된 DAG 파일을 사용함.
- 업데이트 된 DAG 파일에 더 이상 없는 태스크는 삭제된 것으로 표시됨
- 빈번한 일정으로 실행되는 DAG 업데이트
- Airflow UI 에서 DAG 일시중지
- DAG 파일 업로드
- Airflow UI에 업데이트 표시될 때까지 기다림.
- (주의, UI에 업데이트 되어도 모든 worker에 업데이트된 DAG가 반영되었다고 볼 수는 없음)
💡 안전한 DAG 업데이트를 위해 2번 방식인 DAG 중지 후 작업 추천.
DAG 예시
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# 한국 시간 timezone 설정
local_tz = pendulum.timezone("Asia/Seoul")
default_args = {
'owner': 'cuinv',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='sample_dag_v0.0.1',
default_args=default_args,
schedule='1 * * * *',
start_date=datetime(2023, 2, 15, tzinfo=local_tz),
catchup=False
)
start_task = BashOperator(
task_id='start_task',
bash_command="echo 1",
dag=dag
)
echo_task = BashOperator(
task_id="echo_task",
bash_command="echo 2",
dag=dag
)
end_task = BashOperator(
task_id='end_task',
bash_command="echo 3",
dag=dag
)
# 태스크 의존성 연결
start_task >> echo_task >> end_task
python 실행 테스트
python 명령으로 DAG 정의 파일을 실행해본다.
python test_dags.py
발생한 예외가 없으면 정상적인 상태이다.
DAG 인식
Airflow에서 DAG를 정상적으로 인식하는지 확인한다.
airflow dags list
Task 인식
Airflow에서 DAG의 Task를 정상적으로 인식하는지 확인한다.
airflow tasks list DAG_ID
DAG 필수 파라미터
dag_id DAG의 고유 식별자로 사용됩니다.
default_args | DAG에 대한 기본 인수를 정의합니다. 예를 들어, DAG의 start_date, owner, retries 등을 설정할 수 있습니다. |
schedule_interval | DAG의 실행 스케줄을 정의합니다. 예를 들어, '@daily', '@hourly', '0 0 * * *' 등과 같이 설정할 수 있습니다. |
start_date | DAG의 시작 시간을 정의합니다. |
catchup | DAG가 이전 실행을 추적할지 여부를 결정합니다. True로 설정하면 이전 실행을 추적합니다. |
description | DAG의 간단한 설명을 제공합니다. |
DAG 선택 파라미터
owner | DAG를 생성한 사람이나 팀의 이름이나 식별자를 나타내는 파라미터. Airflow UI에서 DAG를 검색할 때 유용하게 사용됩니다. 또한, 이메일 알림 등에서 DAG를 식별할 때도 사용됩니다. |
max_active_runs | DAG에서 동시에 실행할 수 있는 최대 활성 실행 수를 제한합니다. 기본값은 16입니다. |
concurrency | DAG에서 동시에 실행할 수 있는 최대 태스크 수를 제한합니다. 기본값은 16입니다. |
catchup_by_default | 새 DAG 생성시, 기본값으로 catchup이 활성화될지 여부를 설정합니다. 기본값은 True입니다. |
sla | DAG에서 Service Level Agreement (SLA)를 정의합니다. 이를 통해 태스크의 실행 시간 제한 등을 설정할 수 있습니다. |
on_failure_callback | DAG 또는 태스크 실행 실패 시 호출되는 콜백 함수를 지정합니다. |
on_success_callback | DAG 또는 태스크 실행 성공 시 호출되는 콜백 함수를 지정합니다. |
end_date | DAG의 종료 시간을 정의합니다. |
schedule_interval | DAG의 실행 주기를 설정합니다. start_date와 함께 사용됩니다. |
retries | 실패한 태스크를 다시 시도하는 횟수를 설정합니다. |
retry_delay | 실패한 태스크를 다시 시도하기 전 대기 시간을 설정합니다. |
DAG와 관련된 이메일을 보내는데 사용됩니다. 예를 들어, DAG 실패 시 알림 이메일을 보낼 수 있습니다. | |
email_on_failure | DAG 실패 시 이메일 알림을 보낼지 여부를 결정합니다. |
email_on_retry | 태스크 재시도 시 이메일 알림을 보낼지 여부를 결정합니다. |
trigger_rule | 다른 태스크가 완료될 때 실행할지, 실패할 때 실행할지 등 실행 시점을 결정합니다. |
dagrun_timeout | DAG 실행 시간 제한을 설정하는 데 사용됩니다 |
depends_on_past | 이전 DAG 실행이 성공했는지 여부에 따라 현재 DAG 실행을 조건부로 설정합니다. |
default_view | DAG의 기본 보기 모드를 지정합니다. 'tree', 'graph', 'duration', 'gantt' 등의 값을 가질 수 있습니다. |
orientation | DAG 그래프를 그릴 때 노드와 엣지의 방향을 결정합니다. 'TB' (상향), 'LR' (좌우) 등의 값을 가질 수 있습니다. |
params | DAG 내에서 사용할 수 있는 일반적인 파라미터를 정의합니다. 이 파라미터는 Python 사전 형식으로 전달됩니다. |
pool | DAG에서 사용할 수 있는 실행자 풀을 지정합니다. 실행자 풀은 동시에 실행 가능한 작업자 수를 제한하는 데 사용됩니다. |
priority_weight | DAG의 실행 우선순위를 지정합니다. 높은 우선순위는 다른 DAG와 비교했을 때 먼저 실행되도록 보장합니다. |
queue | DAG에서 사용할 수 있는 실행 큐를 지정합니다. 실행 큐는 실행자 풀과 유사한 역할을 합니다. |
'DEV > Airflow' 카테고리의 다른 글
기타 필수 개념 (0) | 2023.04.17 |
---|---|
Airflow 소개 (1) | 2023.04.17 |
댓글