Dagster 알아보기

Dagster란

Dagster는 데이터 파이프라인의 생명주기에서 데이터 간 관계 정의와 모니터링, 선언적 프로그래밍 모델, 테스트 가능성 향상을 위해 설계된 클라우드 네이티브 파이프라인 관리 도구이다.

테이블이나 데이터셋, 머신러닝 모델이나 보고서들과 같은 Data Asset들로 개발 및 관리되도록 설계되었다.

Python 함수로 Data Asset을 정의하면 Dagster는 그 함수를 정확한 시간에 실행하고 Asset을 최신 상태로 지켜준다.

import dagster as dg


@dg.asset
def hello(context: dg.AssetExecutionContext):
    context.log.info("Hello!")


@dg.asset(deps=[hello])
def world(context: dg.AssetExecutionContext):
    context.log.info("World!")

https://docs.dagster.io/


공식 문서에 있는 코드와 이미지인데, 위의 코드처럼 @asset로 Asset을 선언하고, parameters에 의존하는 asset 함수들의 함수명과 return type을 입력했을 때 Asset들 간의 관계를 정의할 수 있다. 이 기능은 함수의 signature를 분석하여 Asset 간의 연결을 찾아내는 것인데, Software-Defined Assets(SDA)는 이를 통해 실행 위주의 관리에서 데이터 중심의 선언적 파이프라인 관리를 가능하게 해준다.

 

Concepts

Dagster는 데이터 파이프라인을 관리하고 구축하기 위한 다양한 추상화들을 제공을 통하여 데이터 엔지니어링에 모듈링식 선언적인 접근을 가능하게 하여, 모니터링 실행, 데이터 품질 향상, 의존성 만들기를 더 쉽게 할 수 있게 해 준다.

Asset

테이블, 데이터셋, ML 모델 등 논리적인 데이터 단위를 의미하며, 파이프라인의 결과물이자 관리의 핵심

Asset Check

데이터의 품질, 신선도, 완성도 등을 검증하는 로직으로, Asset 함수 실행 시 함께 작동하여 데이터 신뢰성을 보장

Asset Spec

실행 로직 없이 자산의 고유 키(Key), 태그 등 정체성과 메타데이터만을 정의하는 객체

Code Location

정의된 엔티티들이 배포되는 독립적인 환경으로, 의존성 격리와 관리를 쉽게 함

Resource

데이터베이스나 API 연결과 같은 외부 시스템과의 통신 수단을 정의하고 공유

IO Manager 

Asset 간에 데이터를 어떻게 저장하고 불러올지(예: S3, Local 등)에 대한 물리적 방식을 정의

Config

Asset이나 Job 실행 시 필요한 파라미터를 정의

Definitions

프로젝트 내 모든 Asset, Resource, job 등을 하나로 묶어 Dagster UI에 노출하고 배포하는 최상위 객체

Job

실행 가능한 Asset들의 집합으로, Dagster에서 실제 작업을 수행하는 기본 단위

Schedule

정해진 시간 간격에 맞춰 Asset이나 Job을 자동으로 실행하는 도구

Sensor

외부 이벤트(파일 업로드, DB 변경 등)를 감지하여 즉각적으로 작업을 트리거하는 도구

Partition

데이터를 시간이나 지역 등 논리적 단위로 나누어 관리함으로써 증분 처리와 효율적인 재작업을 가능하게 함

Component

반복되는 패턴을 템플릿화하여 자산, 스케줄 등을 코드나 YAML 설정으로 빠르게 생성

 

Dagster 설치

Dagster를 사용하는 방법엔 Serverless, Hybrid, OSS 방식이 있다

사용 방법

Serverless

유료 서비스 Dagster+의 완전 관리형 버전으로 개발 및 테스트를 Local에서 하고, 인프라를 관리하지 않아도 된다.

Hybrid

유료 서비스 Dagster+에서 UI나 스케줄링쪽은 Dagster Labs에 위임하고, 인프라를 직접 운영한다.

OSS

오픈소스 소프트웨어 버전으로 모든 인프라를 운영한다.

 

Dagster OSS

이 글에선 무료 버전인 OSS를 docker-compose로 배포해 볼 생각이다.

https://docs.dagster.io/deployment/oss/deployment-options 에서 다양한 개발환경 옵션들에 대한 배포 방법을 설명하고 있으니 다른 방법이 필요하다면 참고하여 배포하면 될 것 같다.

요구사항

Docker와 Docker compose에 대한 설치 및 이해

dagster.yaml 파일에 대한 이해(https://docs.dagster.io/deployment/oss/dagster-yaml)

dagster.yaml 파일은 Dagster 인스턴스 전체의 전역 설정을 담당하며 Storage(실행기록 및 로그, 스케줄 상태 등을 보관하는 저장소), Run Launcher(파이프라인 실행요청이 들어왔을 때 어디에서 실행할지), Scheduler & Sensors(스케줄러와 센서의 동작방식), Run Queueing(동시 실행 가능한 작업 수등을 제한하는 큐 설정) 같은 주요 설정이 있다. 

workspace.yaml 파일에 대한 이해(https://docs.dagster.io/guides/build/projects/workspaces/workspace-yaml)

workspace.yaml 파일은 Dagster UI나 데몬이 사용자의 파이프라인코드를 어디서 가져갈지 정의하는 곳으로 Python file, Python Module, gRPC Server 등의 메소드가 사용 가능하다.

Docker Image 생성

이 글에서 사용한 예제는 https://github.com/pluralsh/dagster-example를 참고하여 작성했다.

먼저 dagster에 대한 설정을 위해 dagster 디렉토리를 만들고 그 안에 dagster.yaml 파일을 작성한다.

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster_docker
  class: DockerRunLauncher
  config:
    env_vars:
      - DAGSTER_POSTGRES_USER
      - DAGSTER_POSTGRES_PASSWORD
      - DAGSTER_POSTGRES_DB
    network: dagster_network
    container_kwargs:
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock
        - dagster_storage:/opt/dagster/dagster_home/storage

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      hostname: postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      hostname: postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      hostname: postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

 

이 tutorial에선 code-server를 운영할 것이기에 workspace.yaml파일을 gRPC Server 형식으로 작성한다.

load_from:
  - grpc_server:
      host: dagster_usercode
      port: 4000
      location_name: "user_code"

Dockerfile은 다음과 같이 생성한다.

FROM python:3.10-slim

COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

RUN apt-get update && apt-get install -y \
    ca-certificates \
    curl \
    gnupg \
    && install -m 0755 -d /etc/apt/keyrings \
    && curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg \
    && chmod a+r /etc/apt/keyrings/docker.gpg \
    && echo "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian \
       "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \
       tee /etc/apt/sources.list.d/docker.list > /dev/null \
    && apt-get update && apt-get install -y docker-ce-cli \
    && rm -rf /var/lib/apt/lists/*

ENV DAGSTER_HOME=/opt/dagster/dagster_home
RUN mkdir -p $DAGSTER_HOME

RUN uv pip install --system \
    dagster \
    dagster-graphql \
    dagster-webserver \
    dagster-postgres \
    dagster-docker

COPY dagster.yaml workspace.yaml $DAGSTER_HOME

WORKDIR $DAGSTER_HOME

 

다음으로 usercode 서버를 위해 usercode 디렉토리를 만들고 Dockerfile을 작성한다.

- app 디렉토리는 추후 생성할 예정

FROM python:3.10-slim AS builder
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /opt/dagster/app

COPY ./app /opt/dagster/app
RUN uv sync --frozen --no-dev

FROM python:3.10-slim
WORKDIR /opt/dagster/app

COPY --from=builder /opt/dagster/app/.venv /opt/dagster/app/.venv
ENV PATH="/opt/dagster/app/.venv/bin:$PATH"

COPY ./app /opt/dagster/app

EXPOSE 4000

HEALTHCHECK --timeout=1s --start-period=3s --interval=3s --retries=20 \
    CMD ["dagster", "api", "grpc-health-check", "-p", "4000"]

CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-m", "app.definitions"]

Docker compose 파일 작성

루트 디렉토리에 docker-compose.yml 파일을 작성한다.

version: '3.7'

services:
  postgresql:
    image: postgres:11
    container_name: postgresql
    environment:
      POSTGRES_USER: 'postgres_user'
      POSTGRES_PASSWORD: 'postgres_password'
      POSTGRES_DB: 'postgres_db'
    networks:
      - dagster_network
    volumes:
      - ./postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ['CMD-SHELL', 'pg_isready -U postgres_user -d postgres_db']
      interval: 10s
      timeout: 8s
      retries: 5

  dagster_usercode:
    build:
      context: ./usercode
    container_name: dagster_usercode
    image: dagster_usercode
    restart: always
    environment:
      DAGSTER_POSTGRES_USER: 'postgres_user'
      DAGSTER_POSTGRES_PASSWORD: 'postgres_password'
      DAGSTER_POSTGRES_DB: 'postgres_db'
      DAGSTER_CURRENT_IMAGE: 'dagster_usercode'
    networks:
      - dagster_network

  dagster_webserver:
    build:
      context: ./dagster/
    entrypoint:
      - dagster-webserver
      - -h
      - '0.0.0.0'
      - -p
      - '3000'
      - -w
      - workspace.yaml
    container_name: dagster_webserver
    expose:
      - '3000'
    ports:
      - '3000:3000'
    environment:
      DAGSTER_POSTGRES_USER: 'postgres_user'
      DAGSTER_POSTGRES_PASSWORD: 'postgres_password'
      DAGSTER_POSTGRES_DB: 'postgres_db'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - dagster_storage:/opt/dagster/dagster_home/storage
    networks:
      - dagster_network
    depends_on:
      postgresql:
        condition: service_healthy
      dagster_usercode:
        condition: service_healthy

  dagster_daemon:
    build:
      context: ./dagster/
    entrypoint:
      - dagster-daemon
      - run
    container_name: dagster_daemon
    restart: on-failure
    environment:
      DAGSTER_POSTGRES_USER: 'postgres_user'
      DAGSTER_POSTGRES_PASSWORD: 'postgres_password'
      DAGSTER_POSTGRES_DB: 'postgres_db'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - dagster_storage:/opt/dagster/dagster_home/storage
    networks:
      - dagster_network
    depends_on:
      postgresql:
        condition: service_healthy
      dagster_usercode:
        condition: service_started
        
volumes:
  dagster_storage:
    name: dagster_storage
    
networks:
  dagster_network:
    driver: bridge
    name: dagster_network

전체 디렉토리 구조는 다음과 같다.

Dagster Code 프로젝트 생성

요구사항

Python 3.10 이상의 버전이 필요하고, 패키지 매니저로 uv를 사용할 것이다.

코드 작성

위에서 생성한 usercode 디렉토리에서 uvx로 dagster에서 제공하는 프로젝트 템플릿을 생성한다.

uvx create-dagster@latest project app

dagster-tutorial 프로젝트를 가져오는 과정이고 y를 입력하면 uv sync가 실행되고, uv를 사용하지 않을 것이면 n을 누르면 된다.

uv를 사용할 것이기에 y를 입력한다.

위에 기입되어 있는 59개 패키지가 설치된 가상환경이 생성된다.

 

app 디렉토리는 다음과 같이 구성된다.

생성한 .venv가 docker build 시에 copy되지 않도록 .dockerignore파일을 usercode 디렉토리에 추가해 준다.

# ./usercode/.dockerignore
**/app/.venv
**/app/__pycache__
**/.git

app 디렉토리에 접근 후 가상환경을 활성화해준다.

cd ./app
source .venv/bin/activate

 

이후, Asset을 Pandas Dataframe으로 정의하기 위해 pandas와 numpy(2 버전 이하)를 패키지에 추가한다.

uv add dagster-postgres dagster-docker
uv add "numpy<2"
uv add pandas

 

그리고 샘플 데이터를 만들고, 간단한 asset 하나를 정의한다.

mkdir -p src/app/defs/data/

cat <<EOF > src/app/defs/data/sample_data.csv
id,name,age,city
1,Alice,28,New York
2,Bob,35,San Francisco
3,Charlie,42,Chicago
4,Diana,31,Los Angeles
EOF

 

# src/app/defs/assets.py
import pandas as pd

import dagster as dg

sample_data_file = "src/app/defs/data/sample_data.csv"
processed_data_file = "src/app/defs/data/processed_data.csv"


@dg.asset
def processed_data():
    ## Read data from the CSV
    df = pd.read_csv(sample_data_file)

    ## Add an age_group column based on the value of age
    df["age_group"] = pd.cut(
        df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"]
    )

    ## Save processed data
    df.to_csv(processed_data_file, index=False)
    return "Data loaded successfully"

 

이제 dg list defs로 정의된 asset이 등록되었는지 확인하고, dg check defs로 정의된 asset이 유효한지 확인한다.

 

최종적으로 완성된 프로젝트 구조는 다음과 같다

docker compose 배포

docker compose로 서비스들을 올려본다.

docker compose up --build -d

 

이미지들이 빌드되고 서비스들이 구동된다.

# docker compose ps
NAME                IMAGE                              COMMAND                  SERVICE             CREATED       STATUS                 PORTS
dagster_daemon      dagster-deploy-dagster_daemon      "dagster-daemon run"     dagster_daemon      2 hours ago   Up 2 hours
dagster_usercode    dagster_usercode                   "dagster code-server…"   dagster_usercode    2 hours ago   Up 2 hours (healthy)   4000/tcp
dagster_webserver   dagster-deploy-dagster_webserver   "dagster-webserver -…"   dagster_webserver   2 hours ago   Up 2 hours             0.0.0.0:3000->3000/tcp, [::]:3000->3000/tcp
postgresql          postgres:11                        "docker-entrypoint.s…"   postgresql          2 hours ago   Up 2 hours (healthy)   5432/tcp

 

이제 3000번 포트로 dagster webserver에 접속하여 lineage에 들어가 보면 아까 정의한 processed_data asset을 확인할 수 있다.

 

이렇게 간단한 asset 추가 및 dagster 설치를 해보았다.

 

파이프라인 만들기

이제 종속성이 있는 asset 관계를 정의하고, job을 만들고, scheduler로 스케줄을 만들어 매일 9시마다 실행되도록 만드려고 한다.

종속성이 있는 Asset 정의

위에서 작성했던 assets.py 파일을 다음과 같이 수정한다.

# src/app/defs/assets.py
import random

import pandas as pd
import dagster as dg

sample_data_file = "src/app/defs/data/sample_data.csv"
processed_data_file = "src/app/defs/data/processed_data.csv"

@dg.asset
def processed_data():
    ## Read data from the CSV
    df = pd.read_csv(sample_data_file)

    ## Add an age_group column based on the value of age
    df["age_group"] = pd.cut(
        df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"]
    )

    ## Save processed data
    df.to_csv(processed_data_file, index=False)
    return "Data loaded successfully"

@dg.asset
def origin_data(context: dg.AssetExecutionContext):
    """사용자 데이터"""
    df = pd.read_csv(sample_data_file)
    context.add_output_metadata({
        "columns": list(df.columns)
    })

    return pd.read_csv(sample_data_file)

@dg.asset
def sales_data(context: dg.AssetExecutionContext):
    """판매 데이터"""
    data = [
        {
            "user_id": random.randint(1, 4),
            "amount": round(random.uniform(10, 100), 2),
            "timestamp": pd.Timestamp.now()
        }
        for _ in range(10) # 10건 생성
    ]

    df = pd.DataFrame(data)
    context.add_output_metadata({
        "columns": list(df.columns)
    })
    return df

@dg.asset()
def joined_users_sales(context: dg.AssetExecutionContext, origin_data: pd.DataFrame, sales_data: pd.DataFrame):
    """유저 정보(CSV)와 판매 데이터를 user_id 기준으로 결합"""

    df = sales_data.merge(origin_data, left_on="user_id", right_on="id")

    context.add_output_metadata({
        "row_count": len(df),
        "columns": list(df.columns),
        "preview": dg.MetadataValue.md(df.head().to_markdown())
    })

    return df

@dg.asset()
def city_sales_stats(context: dg.AssetExecutionContext, joined_users_sales: pd.DataFrame):
    """도시별 전체 매출액과 평균 연령 계산"""

    stats_df = joined_users_sales.groupby("city").agg({
        "amount": "sum",
        "age": "mean"
    }).reset_index()

    stats_df.columns = ["city", "total_revenue", "avg_age"]

    context.add_output_metadata({
        "top_city": stats_df.sort_values("total_revenue", ascending=False).iloc[0]["city"]
    })

    return stats_df

@dg.asset()
def final_sales_report(context: dg.AssetExecutionContext, city_sales_stats: pd.DataFrame):
    """최종 분석 결과를 바탕으로 인사이트 도출 및 파일 저장 시뮬레이션"""

    total_revenue = city_sales_stats["total_revenue"].sum()

    context.log.info(f"Final Analysis Complete! Total Revenue: ${total_revenue:.2f}")

    return {"status": "success", "revenue": total_revenue}

 

이후 dg list defs 하면 다음과 같은 asset들이 인식된다.

 

asset을 정의한 함수마다 docstring을 작성하면 description으로 표현해 준다.

usercode 쪽만 다시 빌드 후 배포한다면 다음과 같은 asset 관계를 볼 수 있다.

docker compose up dagster_usercode --build -d

Job 등록

만든 Asset들을 묶어 Job으로 등록하려고 한다. 이때 define_asset_job method를 사용하여 등록할 수 있다.

이전에 만든 assets.py의 마지막에 다음 코드를 추가한다.

# src/app/defs/assets.py
all_sample_job = dg.define_asset_job(
    name="all_sample_job", selection=['origin_data',
                                      'sales_data',
                                      'joined_users_sales',
                                      'city_sales_stats',
                                      'final_sales_report']
)

이후 usercode를 빌드 후 재배포해보면 등록한 job을 확인할 수 있다.

만든 job에서 materialize all을 클릭하여 모든 asset을 실체화시켜본다.

dagster.yaml에서 run_launcher를 DockerRunLauncher로 정의했기에 docker 컨테이너가 생성되어 해당 작업을 실행한다.

모든 asset들이 실체화된 것을 확인할 수 있다.

또한 asset에서 정의한 metadata들이 성공적으로 등록되어 시각화되고 있는 것을 확인할 수 있다.

Schedule 등록

이제 위에서 등록한 job이 매일 9시에 실행되도록 Schedule를 정의할 차례이다.

이번엔 dg scaffold로 템플릿을 가져온 후 수정해서 사용한다.

defs 폴더에서 다음 명령어를 실행하고 파일을 수정한다.

dg scaffold defs dagster.schedule schedule.py

# src/app/defs/schedule.py
from typing import Union

import dagster as dg

from .assets import all_sample_job


@dg.schedule(cron_schedule="0 9 * * *", target=all_sample_job)
def schedule(context: dg.ScheduleEvaluationContext) -> Union[dg.RunRequest, dg.SkipReason]:
    return dg.RunRequest(
        run_key=None,
        tags={"source": "scheduled_run"}
    )

이후 빌드 후 재기동을 하고 Automation을 보면 정상적으로 schedule이 등록된 걸 볼 수 있다. 오른쪽의 버튼을 오른쪽으로 옮기는 환성화 상태가 된다.

timezone 설정을 하지 않아 UTC로 설정됌

 

이렇게 docker compose로 dagster OSS를 구축해 보고 간단한 Asset, Job, Schedule을 만들어 보았다.

직접 구축해보니 Asset 중심의 선언적 설계, code  gRPC서버를 통한 코드 격리, 쉬운 테스트가 Airflow와 다르게 매우 편리한 부분이라고 생각되었다.

 

다음엔 dagster-dbt와 dagster-duckdb를 사용하여 SQL 기반의 데이터 변환을 Asset으로 통합하고, Duckdb로 분석 파이프라인을 구축해 볼 것이다.