Language/Python

[Python] Polars Lazy API로 효율적인 데이터 처리

홀쑥 2025. 4. 21. 23:23

Polars Lazy API

Polars의 Lazy API란 연산을 즉시 실행하지 않고, 연산 계획을 세운 후 실제 결과가 필요할 때 collect()를 통해 실행하는 방식이다.

이런 지연실행 방식은 불필요한 연산을 줄이고, 최적화된 쿼리 실행을 가능하게 하여 데이터 처리 속도를 크게 향상시킨다.

 

왜 Lazy API를 사용할까?

성능 최적화

Polars는 쿼리 계획을 생성한 뒤 최적화를 통해 불필요한 연산을 제거하거나 순서를 조정하여 성능을 높인다.(predicate pushdown)

메모리 효율성

Polars는 필요한 컬럼이나 행만 메모리에 올려, 메모리 사용량을 최소화한다.(projection pushdown)

대규모 데이터 처리

전체 데이터를 한 번에 로드하지 않고, 스트리밍 방식으로 데이터를 처리하여 메모리를 효율적으로 활용할 수 있다.(engine="streaming")

 

df = pl.read_csv("docs/assets/data/iris.csv")
df_small = df.filter(pl.col("sepal_length") > 5)
df_agg = df_small.group_by("species").agg(pl.col("sepal_width").mean())
print(df_agg)

 

이 예제에선 eager API를 사용한다. (1) 붓꽃 데이터를 읽고 (2) sepal_length로 데이터를 필터링하고, (3) species별 sepal_width의 평균을 계산한다. 이 때, 모든 데이터는 즉시 실행되어 필요없는 데이터를 읽을 수 있기에 낭비가 있다.

 

q = (
    pl.scan_csv("docs/assets/data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)

df = q.collect()

 

이 예제는 Lazy API를 사용하였다. 위와 똑같은 과정을 거치지만 Predicate pushdown을 통해 데이터를 읽으며 동시에 sepal_length가 5보다 큰 데이터를 필터링하고, Projection pushdown를 통해 필요한 데이터만 읽어 sepal_length, species, sepal_width 컬럼만 읽게 된다.

이런 Optimizations를 통해 메모리 및 CPU 부하를 줄여 더 큰 데이터들을 메모리에서 더 빠르게 처리할 수 있다.

 

실행계획 미리보기

Lazy API를 사용할 때 explain 함수를 이용하여 Polars가 실행될 쿼리의 계획을 미리 볼 수 있다.다음 예제는 https://www.kaggle.com/datasets/uciml/iris에서 Iris.csv를 다운받아 위 예제를 똑같이 만들어 실행계획을 확인해보았다.

import polars as pl
q = (
    pl.scan_csv("./data/Iris.csv")
    .filter(pl.col("SepalLengthCm") > 5)
    .group_by("Species")
    .agg(pl.col("SepalWidthCm").mean())
)

print(q.explain())

결과로 출력된 실행계획

  1. Csv SCAN [./data/Iris.csv] : 데이터의 원본에서 데이터를 읽기 시작
  2. PROJECT 3/6 COLUMNS : CSV에는 총 6개의 컬럼이 있고 그 중 3개를 읽음
  3. SELECTION: [(col(" SepalLengthCm")) > (5.0)] : 컬럼 SepalLengthCm가 5.0보다 큰 데이터만 필터링
  4. AGGREGATE [col(" SepalWidthCm").mean()] BY [col("Species")] FROM simple π 3/3 ["SepalWidthCm", "Species", ... 1 other column] : 컬럼 SepalWidthCm의 평균을 Species로 그룹화, simple π는 현재 남아있는 컬럼상태 표현, SepalWidthCm, Species, 기타 1개(조건에 사용된 SepalLengthCm)가 남아있음을 의미

Optimizations

이전 글에서도 한 번 다루긴 했지만 Polars는 다음과 같은 빈도로 쿼리를 최적화한다.

Predicate Pushdown

데이터를 읽기 전에 필터 적용하는 것이며 scan 단계에서 한 번 실행된다.

lf = (
    pl.scan_csv("data/Iris.csv")
      .filter(pl.col("SepalLengthCm") > 5.0)
)

 

Projection Pushdown

필요한 컬럼만 추출하는 것이며 scan 단계에서 한 번 실행된다.

lf = (
    pl.scan_csv("data/Iris.csv")
      .select(["SepalLengthCm", "SepalWidthCm", "Species"])
)

 

Slice Pushdown

필요한 slice(특정 행)만 load하는 것이며 scan 단계에서 한 번 실행된다.

lf = (
    pl.scan_csv("./data/Iris.csv")
      .slice(10, 5)  # 10번째 행부터 5개만
)

 

Common Subplan Elimination

중복 쿼리 제거로 여러 하위 트리에서 반복 사용되는 서브트리, 파일 scan을 캐싱하며 한 번 실행된다.

shared = pl.scan_csv("./data/Iris.csv").select(["SepalWidthCm"])

agg_sum  = shared.select(pl.col("SepalWidthCm").sum().alias("sum_w"))
agg_mean = shared.select(pl.col("SepalWidthCm").mean().alias("mean_w"))

combined = pl.concat([agg_sum, agg_mean])

print(combined.explain())

Simplify Expressions

표현식 단순화로 constant folding(미리 계산할 수 있는 상수값은 실행 전 미리 연산)이나 비싼 연산을 더 빠른 대안으로 대체하는 등의 여러 최적화를 수행하며 더 이상 바꿀 게 없을때까지 수행한다.

lf = (
    pl.scan_csv("./data/Iris.csv")
      .select([
          (pl.lit(5) + 3 - 8).alias("const_zero"),
          (pl.lit(2) * pl.lit(3) + pl.lit(4)).alias("const_calc"),
          (pl.col("SepalWidthCm") * 1).alias("w")
      ])
)
print(lf.explain())

Join Ordering

Join 순서 최적화 메모리 부담을 줄이기 위해 Join에서 어떤 부분을 먼저 실행할 지 예측하여 결정하며 한 번 실행된다.

Type Coercion

타입을 강제하는 것으로, 연산이 성공하고 최소한의 메모리만 사용할 수 있도록 타입을 강제하며 변경이 불가능할 때 까지 수행한다.

Cardinality Estimation

카디널리티 추정, 최적의 GROUP BY를 위해 카디널리티(고유 값 개수)를 추정하며 쿼리에 따라 실행되지 않거나 여러 번 수행될 수 있다.

 

참고자료