- [Airflow] Airflow 3로 업그레이드2025년 07월 20일
- 홀쑥
- 작성자
- 2025.07.20.:12
Airflow 3
Airflow3이 2025년 4월 22일에 출시되었다.(https://airflow.apache.org/blog/airflow-three-point-oh-is-here/)
Apache Airflow® 3 is Generally Available!
We're proud to announce that Apache Airflow 3.0.0 has been released.
airflow.apache.org
Airflow2가 나온 2020년 12월 17일 이후로 거의 약 4년 8개월만에 메이저 버전이 올라갔다. 현재 3.0.3(2025-07-14)까지 존재한다.
Airflow 2의 아키텍쳐(https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading_to_airflow3.html#understanding-airflow-3-x-architecture-changes) - 기존엔 모든 컴포넌트가 airflow metadata database에 직접 접근
- 모든 컴포넌트가 같은 네트워크에 있다고 가정하고 설계(task code와 task 실행 code가 같은 프로세스에서 실행)
- 워커가 Airflow database에 직접 접근하고, 모든 사용자의 코드를 실행
- 사용자가 session을 통해 Airflow metadta database 조작 가능
- 데이터베이스 connection이 너무 많았음
Airflow 3의 아키텍쳐(https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading_to_airflow3.html#understanding-airflow-3-x-architecture-changes) - API Server가 metadata database에 접근할 수 있는 유일한 포인트
- API Server 서버가 UI, Rest API, Java Script를 제공하는 UI용 내부 API, Task Execution Interface를 통한 워커의 TI 실행에 관여
- 워커가 API Server를 통해 데이터를 주고 받음
- DAG Processor 와 Triggerer 가 connection이나 variable이 필요할 때 task execution 매커니즘을 활용
주요 변경점
UX 개선
DAG Versioning
DAG에 버전관리 기능이 추가되었다.
DAG이 실행되는 동안은 DAG Version이 올라갔어도 실행되던 시점 버전의 DAG이 실행된다.
UI에서는 DAG의 버전 별로 Task 구조나 코드, 로그 등등을 확인할 수 있다.
Backfills improvement
드디어 backfill을 웹 또는 API로 동작하고 UI에서 모니터링 할 수 있게 되었다. 또한 지금까진 Scheduler가 backfill을 실행시키지 않았지만 이젠 Scheduler가 실행을 하게 된다. 이로 인하여 backfill을 cli로 실행하던 것과 backfill type의 dagrun들을 따로 관리하던 수고를 덜 수 있게 되었다.
어디든, 언제든, 어떤 언어든 실행할 수 있다.
Run anywhere, in any language
Airflow 3의 기초 목표는 어느 환경, 어느 언어든 실행할 수 있게 하는 것이였다고 한다. Airflow3에선 이것을 위해 Server-Client 아키텍쳐로 변경되었다.
Task execution interface 입력을 제공하는 API Server가 변경 요소 중 하나이다. 다중 클라우드, 다중 언어를 지원하는 것을 Task execution interface를 통해 가능하게 되었다. Airflow3은 기존 존재했던 DAG들도 Python Task SDK를 통해 호환하는 것도 포함되어 있으며 GoLang을 시작으로 다른 추가적인 언어에 대한 TaskSDK를 몇 달 이내에 제공할 예정이라고 한다.
또한 중앙 데이터센터 또는 클라우드의 밖에있는 edge device에서 데이터 파이프라인을 실행시킬 수 있도록 하기 위해 Edge Executor를 provider package로 사용 가능하다.(task execution interface 위에서 동작한다)
Event-driven scheduling and Data Assets
Airflow3은 Data Assets가 생성되거나 외부 시스템에 의해 업데이터 되는 것을 포함해 Airflow 밖의 이벤트 발생에 따른 반응이 가능해졌다고 한다. Data Assets는 Dataset의 진화이고, 데코레이터를 사용해 Asset을 정의가능하며 Watcher를 통해 외부 이벤트 기반 스케줄링이 가능하다고 한다.
Inference execution and hyper-parameter tuning
ML/ AI 엔지니어들이 추론 실행에 사용하기 편하도록 비-데이터-간격 DAG(DagRun에서 ExecutionDate 제약 제거)을 지원하여 실시간 또는 요청 기반 추론 workflow를 구성할 수 있도록 한다.
보안 및 사용성 개선
UI Modernization
Webserver의 UI가 기존 투박했던 것에 비해 매우 변경되었다.
https://airflow.apache.org/docs/apache-airflow/stable/ui.html에서 해당 URL에서 변경된 디자인을 확인할 수 있다.
그리고 Flask App builder를 사용하던 것에서 React 및 FastAPI로 변경되었다고 하며 여러 사용성 개선이 이루어졌다고 한다. 하지만 그렇다고해서 Flask App builder가 완전히 사라진 것이 아니라 핵심 종속성에서 제거되고, provider 패키지로써 제공된다고 한다.
Security
Task Execution Interface와 API Server는 Task를 격리시킬 수 있는 장점이 있다고 한다. Airflow 배포가 여러 팀에서 공유될 때 보안적인 측면이 강화됬다고 한다.
또한 CLI가 로컬 및 하위 호환을 위한 한 부분과, 원격 API 호출을 통한 부분(airflowctl이라고 칭하며, 선택적으로 설치 가능)으로 분리되었다고 한다.
Airflow 3으로 업그레이드
https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading_to_airflow3.html
Step1. 업그레이드 전 확인
- 기존 Airflow 2.7 이상의 버전 사용을 권장하며, 아닐 경우 2.x대로 먼저 업그레이드 후 진행
- Python 3.9 이상의 버전 사용
- Airflow 3에서 사라진 함수나 기능을 사용하는지 확인(https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading_to_airflow3.html#breaking-changes)
- SubDAGs
- Sequentail Executor
- CeleryKubernetesExecutor, LocalKubernetersExecutor
- SLAs
- Subdir(CLI 명령어에서)
- REST API(/api/v1이 /api/v2로 변경됨)
- 몇몇 Airflow Content variables
- execution_date 관련된 일부 값들이 변경됨
- catchup_by_default (False로 기본설정)
- create_cron_data_internals
- Simple Auth가 기본 auth_manager로 설정됨
- FAB(Flask App Builder)의 Auth Manager를 사용하고 싶다면 airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager를 auth_manager로 설정
Step2. 운영중이던 Airflow Instance를 백업 및 정리
- 꼭 백업을 한다. 특히 Airflow Metadata database를 꼭 백업하라고 한다.
- 만약 Hot Backup(운영중인 상태에서 Backup)이 불가능하다면, Airflow instance를 종료 후 Backup해야 일관성이 보장된다.
- 만약 Backup을 하지 않고, migration에 실패할 경우, 반쯤만 성공한 migration 상태가 될 수 있다.
- migration 단계에서 스키마 변경이 발생하고, 데이터가 많을 경우 오랜 시간이 소요되기에, 빠르고 안전한 migration을 위해선 오래된 XCOM과 같은 불필요한 데이터들을 제거가 권장된다. airflow cli 중 airflow db clean도 사용할 수 있다.
- dag processing에 오류가 없는지 확인한다.
- airflow cli로 airflow dags reserialize를 오류없이 실행할 수 있어야 한다.
- dag processing에 오류를 해소하고 이게 적용되어 dag processing이 정상화되고 모든 에러가 사라진 후 migration을 진행한다.
Step3. DAG들의 호환성 확인
- Dag의 Airflow 3 호환성을 편리하게 하는 ruff 유틸리티가 만들어져있다.
- ruff는 0.11.6 이상을 사용해야 하며 호환되지 않는 DAG의 수정방법은 다음과 같다.
수정이 필요한 DAG 확인
ruff check dag/ --select AIR301 --preview
수정할 부분 확인
ruff check dag/ --select AIR301 --show-fixes --preview
일부 자동수정 가능한 부분 자동 수정ruff check dag/ --select AIR301 --fix --preview
Step4. apache-airflow-providers-standard 설치
- airflow-core 패키지에 내장되어있던 일부 번들들(예를 들어 BashOperator, PythonOperator)이 apache-airflow-providers-standard 패키지로 분리되었다고 한다.
- 이 패키지는 Airflow 2.x 에서도 설치가 가능하여 airflow-core에서 사용하던 대신 해당 패키지의 오퍼레이터들을 사용할 수 있다.
Step5. 기존 Custom Operator들이 직접 Metadata Database에 접근하던 것을 수정
- Airflow 3에선 오퍼레이터들이 Metadata Database에 직접 접근하는 것이 불가능하고, API Server를 통해야 한다.
- 따라서 기존에 Custom하여 사용하던 오퍼레이터들이 Metadata Database에 직접 접근하는지 확인하고, 수정해야 한다.
- sqlalchemy에서 sesion객체를 직접 생성하거나, airflow.utils.session에서 NEW_SESSION, create_session, provide_session등을 사용하여 session에 쿼리하던 것들을 수정해야 한다.
- https://github.com/apache/airflow/issues/49187 에서 예제들을 확인 가능하다.
""" https://github.com/apache/airflow/blob/main/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py#L44 """ from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS ... if AIRFLOW_V_3_0_PLUS: from airflow.sdk.execution_time.xcom import XCom else: from airflow.models import XCom """ https://github.com/apache/airflow/blob/main/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py#L95 """ if AIRFLOW_V_3_0_PLUS: from airflow.sdk import Asset else: # dataset is renamed to asset since Airflow 3.0 from airflow.datasets import Dataset as Asset
Step6. Airflow 업그레이드
- configuration의 업그레이드를 쉽게 하기 위해 유틸리티를 만들어 놨다.
Airflow config를 업그레이드 해야할 항목 확인
airflow config update
--fix옵션을 준다면 자동적으로 Airflow 3에 호환 가능하도록 자동으로 업데이터 해준다.
airflow config update --fix
업그레이드 중 가장 큰 부분은 Metadata Database Migration인데 Airflow 2.7 이후의 버전과 같이 다음 명령어로 업그레이드 가능하다.
airflow db migrate
만약 FAB(flask app builder)의 view, menu items, blueprints를 사용하고 있다면, FastAPI로 변경하거나 FAB provider를 설치해야 한다.
Step7. 기동 스크립트 변경
Airflow에선 api-server가 생기고 scheduler에 있던 dag-processor가 별도의 컴포넌트로 분리되었기에 각각 다음 명령어들로 컴포넌트를 동작시켜줘야 한다.
airflow api-server
airflow dag-processor
docker-compose.yml(https://airflow.apache.org/docs/apache-airflow/3.0.3/docker-compose.yaml)파일도 수정되었지만 특히 api-server와 dag-processor 서비스가 추가된 것이 보이며 AIRFLOW__CORE__EXECUTION_API_SERVER_URL로 EXECUTION API SERVER의 URL을 지정해주는 환경변수가 보인다.
... environment: AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/' ... services: ... airflow-apiserver: <<: *airflow-common command: api-server ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: airflow-dag-processor: <<: *airflow-common command: dag-processor healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully
다음글이전글이전 글이 없습니다.댓글