- PySpark에서 MinIO와 Iceberg를 연동해 DataLake 구축하기2025년 11월 07일
- 홀쑥
- 작성자
- 2025.11.07.:50
목표
spark 및 iceberg 사용 경험을 위해 pyspark로 HadoopCatalog를 사용하는 MinIO에 iceberg 테이블을 만들고, 샘플 데이터를 삽입 후 쿼리해서 결과를 얻어보려고 한다.
사전 설치
1. 접근 가능한 Spark Cluster(pyspark)
2. UV(파이썬 패키지 및 프로젝트 매니저)
3. Spark에 설치된 pyspark와 같은 버전의 python
4. MinIO(또는 S3)
테스트 준비
프로젝트 생성
uv로 프로젝트를 생성한다
uv init spark_test샘플데이터 준비
테이블에 넣기 위한 데이터를 준비한다
faker 라이브러리로 데이터 생성하는 스크립트 생성하기 위해 라이브러리 설치한다. 샘플이기에 pyproject에 추가하지 않는다
uv pip install Faker tqdm입력한 arg 숫자에 따른 데이터를 생성하고 csv로 저장하는 간단한 스크립트를 만든다
ko-KR로 한글 샘플 생성하게 한다.
"""sample_maker.py""" import sys import os import csv from faker import Faker from tqdm import tqdm line_no = int(sys.argv[1]) with open('sample.csv', 'w', encoding='utf-8') as f: cf = csv.writer(f) cf.writerow([ 'name', 'address', 'phone_number', 'post_code', 'email', 'job', 'company', 'country' ]) for i in tqdm(range(0, line_no)): fk = Faker('ko-KR') line= [ fk.name(), fk.address(), fk.phone_number(), fk.postcode(), fk.email(), fk.job(), fk.company(), fk.country() ] cf.writerow(line)1000줄의 샘플 데이터 생성을 해본다
python3 sample_maker.py 1000

샘플 생성 결과 샘플 데이터 업로드
만든 샘플 데이터를 minio에 업로드 한다
여기에선 develop bucket에 sample/sample.csv로 저장했다.

develop 버킷의 sample/sample.csv 의 모습 bucket 생성
iceberg warehouse를 만들 bucket을 생성한다

사용할 버킷을 미리 생성한다 구현
패키지 설치
Spark의 버전 확인 후 같은 버전으로 pyspark를 설치한다. 여기에서 사용하는 spark는 3.5.7를 사용했다.
uv add "numpy<2.0.0" pyspark==3.5.7 pandas pyarrownumpy2.0에서 제거된 심볼들이 pyspark(3.5.7)에서 사용 중이여서 버전을 내렸다
코드 작성
세션 생성
from pyspark.sql import SparkSession ICEBERG_VERSION = "1.10.0" builder = ( SparkSession.builder # Spark Application 이름 (Spark UI에서 표시됨) .appName("ThinmugTest") # Spark Master URL (예: spark://spark-master:7077) .master(master_url) # Driver의 바인드 주소 (Executor가 접근 가능해야 함) # Docker/K8s 환경에서는 0.0.0.0을 사용하는 것이 일반적 .config("spark.driver.bindAddress", "0.0.0.0") # Driver가 외부에 노출될 IP 또는 호스트 이름 # Docker/K8s 환경에서는 명시적으로 IP를 지정해야 Executor가 연결 가능 .config("spark.driver.host", driver_host) # Executor JVM이 사용할 메모리 크기 .config("spark.executor.memory", '2g') # Driver JVM이 사용할 메모리 크기 .config("spark.driver.memory", '1g') # 셔플 연산(groupBy, join 등) 시 병렬 처리 단위로 생성할 파티션 수 .config("spark.sql.shuffle.partitions", str(shuffle_partitions)) # 콘솔 진행률 표시 (테스트 시 로그 단순화 목적) .config("spark.ui.showConsoleProgress", "false") # 필요한 라이브러리를 자동 다운로드하도록 지정 # Iceberg 및 S3 연동에 필요한 JAR 패키지 목록 .config( "spark.jars.packages", ",".join([ f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}", # Spark 3.5 + Scala 2.12 호환 Iceberg 런타임 f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}", # S3FileIO, Glue 등 AWS 관련 기능 포함 "org.apache.hadoop:hadoop-aws:3.3.4", # S3A 파일시스템 지원 ]) ) # S3a 설정(MinIO 파일 읽기용) .config( "spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", ) # Spark 인증 객체 설정 .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.hadoop.fs.s3a.endpoint", endpoint) # ex) http://10.10.10.10:9000 .config("spark.hadoop.fs.s3a.path.style.access", "true") # MinIO 필수 .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") # HTTP이면 false .config("spark.hadoop.fs.s3a.access.key", access_key) .config("spark.hadoop.fs.s3a.secret.key", secret_key) # Iceberg Catalog ## catalog 설정 .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") ## Catalog 구현체: HadoopCatalog (메타데이터를 파일로 관리) .config("spark.sql.catalog.local.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog") ## Iceberg warehouse 경로 (데이터가 저장될 루트 S3 경로) .config("spark.sql.catalog.local.warehouse", "s3a://iceberg/warehouse/") ## S3 입출력 구현체 지정 (AWS S3 또는 MinIO 호환) .config("spark.sql.catalog.local.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") ## S3(MinIO) 접속 정보 .config("spark.sql.catalog.local.s3.endpoint", endpoint) # MinIO endpoint (예: http://10.10.10.10:9000) .config("spark.sql.catalog.local.s3.path-style-access", "true") # MinIO는 path-style 접근 필요 .config("spark.sql.catalog.local.s3.region", region) # 기본적으로 us-east-1 .config("spark.sql.catalog.local.s3.access-key-id", access_key) # Access key .config("spark.sql.catalog.local.s3.secret-access-key", secret_key) # Secret key .config("spark.sql.catalog.local.client.region", region) ) spark = builder.getOrCreate()파일 읽기
df = spark.read.csv( 's3a://develop/sample/sample.csv', header=True, inferSchema=True, sep=",", ) df.show()
show로 보인 데이터프레임 일부 테이블 생성
session에서 설정한 local 카탈로그에 people.contacts 테이블을 생성한다.
여기에선 format-version 2를 사용했다.
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.people") spark.sql(""" CREATE TABLE IF NOT EXISTS local.people.contacts ( name STRING, address STRING, phone_number STRING, post_code STRING, email STRING, job STRING, company STRING, country STRING ) USING iceberg TBLPROPERTIES ('format-version' = '2') """)데이터 삽입
데이터프레임에 있는 데이터를 테이블에 추가한다
df.writeTo("local.people.contacts").append()테이블 확인
read.table로 테이블을 읽어 데이터프레임으로 받고, show로 데이터를 확인한다
result = spark.read.table("local.people.contacts") result.show(5, truncate=False)
세션 종료
spark.stop()결과 확인
MinIO의 데이터 확인
만들어놓은 bucket에서 파일들을 확인한다.

metadata들 
데이터 파일 다음글이전글이전 글이 없습니다.댓글