[Python] Polars Lazy API로 효율적인 데이터 처리
Polars Lazy API
Polars의 Lazy API란 연산을 즉시 실행하지 않고, 연산 계획을 세운 후 실제 결과가 필요할 때 collect()를 통해 실행하는 방식이다.
이런 지연실행 방식은 불필요한 연산을 줄이고, 최적화된 쿼리 실행을 가능하게 하여 데이터 처리 속도를 크게 향상시킨다.
왜 Lazy API를 사용할까?
성능 최적화
Polars는 쿼리 계획을 생성한 뒤 최적화를 통해 불필요한 연산을 제거하거나 순서를 조정하여 성능을 높인다.(predicate pushdown)
메모리 효율성
Polars는 필요한 컬럼이나 행만 메모리에 올려, 메모리 사용량을 최소화한다.(projection pushdown)
대규모 데이터 처리
전체 데이터를 한 번에 로드하지 않고, 스트리밍 방식으로 데이터를 처리하여 메모리를 효율적으로 활용할 수 있다.(engine="streaming")
df = pl.read_csv("docs/assets/data/iris.csv")
df_small = df.filter(pl.col("sepal_length") > 5)
df_agg = df_small.group_by("species").agg(pl.col("sepal_width").mean())
print(df_agg)
이 예제에선 eager API를 사용한다. (1) 붓꽃 데이터를 읽고 (2) sepal_length로 데이터를 필터링하고, (3) species별 sepal_width의 평균을 계산한다. 이 때, 모든 데이터는 즉시 실행되어 필요없는 데이터를 읽을 수 있기에 낭비가 있다.
q = (
pl.scan_csv("docs/assets/data/iris.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
df = q.collect()
이 예제는 Lazy API를 사용하였다. 위와 똑같은 과정을 거치지만 Predicate pushdown을 통해 데이터를 읽으며 동시에 sepal_length가 5보다 큰 데이터를 필터링하고, Projection pushdown를 통해 필요한 데이터만 읽어 sepal_length, species, sepal_width 컬럼만 읽게 된다.
이런 Optimizations를 통해 메모리 및 CPU 부하를 줄여 더 큰 데이터들을 메모리에서 더 빠르게 처리할 수 있다.
실행계획 미리보기
Lazy API를 사용할 때 explain 함수를 이용하여 Polars가 실행될 쿼리의 계획을 미리 볼 수 있다.다음 예제는 https://www.kaggle.com/datasets/uciml/iris에서 Iris.csv를 다운받아 위 예제를 똑같이 만들어 실행계획을 확인해보았다.
import polars as pl
q = (
pl.scan_csv("./data/Iris.csv")
.filter(pl.col("SepalLengthCm") > 5)
.group_by("Species")
.agg(pl.col("SepalWidthCm").mean())
)
print(q.explain())
- Csv SCAN [./data/Iris.csv] : 데이터의 원본에서 데이터를 읽기 시작
- PROJECT 3/6 COLUMNS : CSV에는 총 6개의 컬럼이 있고 그 중 3개를 읽음
- SELECTION: [(col(" SepalLengthCm")) > (5.0)] : 컬럼 SepalLengthCm가 5.0보다 큰 데이터만 필터링
- AGGREGATE [col(" SepalWidthCm").mean()] BY [col("Species")] FROM simple π 3/3 ["SepalWidthCm", "Species", ... 1 other column] : 컬럼 SepalWidthCm의 평균을 Species로 그룹화, simple π는 현재 남아있는 컬럼상태 표현, SepalWidthCm, Species, 기타 1개(조건에 사용된 SepalLengthCm)가 남아있음을 의미
Optimizations
이전 글에서도 한 번 다루긴 했지만 Polars는 다음과 같은 빈도로 쿼리를 최적화한다.
Predicate Pushdown
데이터를 읽기 전에 필터 적용하는 것이며 scan 단계에서 한 번 실행된다.
lf = (
pl.scan_csv("data/Iris.csv")
.filter(pl.col("SepalLengthCm") > 5.0)
)
Projection Pushdown
필요한 컬럼만 추출하는 것이며 scan 단계에서 한 번 실행된다.
lf = (
pl.scan_csv("data/Iris.csv")
.select(["SepalLengthCm", "SepalWidthCm", "Species"])
)
Slice Pushdown
필요한 slice(특정 행)만 load하는 것이며 scan 단계에서 한 번 실행된다.
lf = (
pl.scan_csv("./data/Iris.csv")
.slice(10, 5) # 10번째 행부터 5개만
)
Common Subplan Elimination
중복 쿼리 제거로 여러 하위 트리에서 반복 사용되는 서브트리, 파일 scan을 캐싱하며 한 번 실행된다.
shared = pl.scan_csv("./data/Iris.csv").select(["SepalWidthCm"])
agg_sum = shared.select(pl.col("SepalWidthCm").sum().alias("sum_w"))
agg_mean = shared.select(pl.col("SepalWidthCm").mean().alias("mean_w"))
combined = pl.concat([agg_sum, agg_mean])
print(combined.explain())
Simplify Expressions
표현식 단순화로 constant folding(미리 계산할 수 있는 상수값은 실행 전 미리 연산)이나 비싼 연산을 더 빠른 대안으로 대체하는 등의 여러 최적화를 수행하며 더 이상 바꿀 게 없을때까지 수행한다.
lf = (
pl.scan_csv("./data/Iris.csv")
.select([
(pl.lit(5) + 3 - 8).alias("const_zero"),
(pl.lit(2) * pl.lit(3) + pl.lit(4)).alias("const_calc"),
(pl.col("SepalWidthCm") * 1).alias("w")
])
)
print(lf.explain())
Join Ordering
Join 순서 최적화 메모리 부담을 줄이기 위해 Join에서 어떤 부분을 먼저 실행할 지 예측하여 결정하며 한 번 실행된다.
Type Coercion
타입을 강제하는 것으로, 연산이 성공하고 최소한의 메모리만 사용할 수 있도록 타입을 강제하며 변경이 불가능할 때 까지 수행한다.
Cardinality Estimation
카디널리티 추정, 최적의 GROUP BY를 위해 카디널리티(고유 값 개수)를 추정하며 쿼리에 따라 실행되지 않거나 여러 번 수행될 수 있다.
참고자료
- https://docs.pola.rs/user-guide/concepts/lazy-api/
- https://docs.pola.rs/user-guide/lazy/optimizations/
- https://www.kaggle.com/datasets/uciml/iris?resource=download