• 티스토리 홈
  • 프로필사진
    홀쑥
  • 방명록
  • 공지사항
  • 태그
  • 블로그 관리
  • 글 작성
홀쑥
  • 프로필사진
    홀쑥
    • 분류 전체보기 (59)
      • Language & Framework (14)
        • Java (1)
        • Python (13)
      • DataBase (4)
        • NoSQL (1)
        • RDBMS (3)
      • Big Data & Ecosystem (9)
        • Hadoop (5)
        • Hive (2)
        • Sqoop (1)
        • Zeppelin (1)
      • Data Engineering (1)
        • Airflow (1)
      • Cloud & DevOps (1)
        • AWS (0)
        • GCP (1)
      • Monitoring & Logging (2)
        • ElasticSearch (2)
      • Infrastructure (12)
        • OS (12)
        • Docker (0)
        • Kubernetes (0)
      • Algorithm & CS (7)
        • 백준 알고리즘 (6)
      • Troubleshooting (5)
        • 오류 모음 (5)
      • 취미 (1)
        • 홈서버 (1)
  • 방문자 수
    • 전체:
    • 오늘:
    • 어제:
  • 최근 댓글
      등록된 댓글이 없습니다.
    • 최근 공지
        등록된 공지가 없습니다.
      # Home
      # 공지사항
      #
      # 태그
      # 검색결과
      # 방명록
      • PySpark에서 MinIO와 Iceberg를 연동해 DataLake 구축하기
        2025년 11월 07일
        • 홀쑥
        • 작성자
        • 2025.11.07.:50

        목표

        spark 및 iceberg 사용 경험을 위해 pyspark로 HadoopCatalog를 사용하는 MinIO에 iceberg 테이블을 만들고, 샘플 데이터를 삽입 후 쿼리해서 결과를 얻어보려고 한다.

         

        사전 설치

        1. 접근 가능한 Spark Cluster(pyspark)

        2. UV(파이썬 패키지 및 프로젝트 매니저)

        3. Spark에 설치된 pyspark와 같은 버전의 python

        4. MinIO(또는 S3)

        테스트 준비

        프로젝트 생성

        uv로 프로젝트를 생성한다

        uv init spark_test

         

        샘플데이터 준비

        테이블에 넣기 위한 데이터를 준비한다

        faker 라이브러리로 데이터 생성하는 스크립트 생성하기 위해 라이브러리 설치한다. 샘플이기에 pyproject에 추가하지 않는다

        uv pip install Faker tqdm

        입력한 arg 숫자에 따른 데이터를 생성하고 csv로 저장하는 간단한 스크립트를 만든다

        ko-KR로 한글 샘플 생성하게 한다.

        """sample_maker.py"""
        import sys
        import os
        import csv
        
        from faker import Faker
        from tqdm import tqdm
        
        line_no = int(sys.argv[1])
        
        with open('sample.csv', 'w', encoding='utf-8') as f:
            cf = csv.writer(f)
            cf.writerow([
                'name',
                'address',
                'phone_number',
                'post_code',
                'email',
                'job',
                'company',
                'country'
            ])
            for i in tqdm(range(0, line_no)):
                fk = Faker('ko-KR')
        
                line= [
                        fk.name(),
                        fk.address(),
                        fk.phone_number(),
                        fk.postcode(),
                        fk.email(),
                        fk.job(),
                        fk.company(),
                        fk.country()
                        ]
                cf.writerow(line)

         

        1000줄의 샘플 데이터 생성을 해본다

        python3 sample_maker.py 1000

        샘플 생성 결과

        샘플 데이터 업로드

        만든 샘플 데이터를 minio에 업로드 한다

        여기에선 develop bucket에 sample/sample.csv로 저장했다.

        develop 버킷의 sample/sample.csv 의 모습

        bucket 생성

        iceberg warehouse를 만들 bucket을 생성한다

        사용할 버킷을 미리 생성한다

         

        구현

        패키지 설치

        Spark의 버전 확인 후 같은 버전으로 pyspark를 설치한다. 여기에서 사용하는 spark는 3.5.7를 사용했다.

        uv add "numpy<2.0.0" pyspark==3.5.7 pandas pyarrow

        numpy2.0에서 제거된 심볼들이 pyspark(3.5.7)에서 사용 중이여서 버전을 내렸다

        코드 작성

        세션 생성

        from pyspark.sql import SparkSession
        
        ICEBERG_VERSION = "1.10.0"
        
        builder = (
            SparkSession.builder
            # Spark Application 이름 (Spark UI에서 표시됨)
            .appName("ThinmugTest")
        
            # Spark Master URL (예: spark://spark-master:7077)
            .master(master_url)
        
            # Driver의 바인드 주소 (Executor가 접근 가능해야 함)
            # Docker/K8s 환경에서는 0.0.0.0을 사용하는 것이 일반적
            .config("spark.driver.bindAddress", "0.0.0.0")
        
            # Driver가 외부에 노출될 IP 또는 호스트 이름
            # Docker/K8s 환경에서는 명시적으로 IP를 지정해야 Executor가 연결 가능
            .config("spark.driver.host", driver_host)
        
            # Executor JVM이 사용할 메모리 크기
            .config("spark.executor.memory", '2g')
        
            # Driver JVM이 사용할 메모리 크기
            .config("spark.driver.memory", '1g')
        
            # 셔플 연산(groupBy, join 등) 시 병렬 처리 단위로 생성할 파티션 수
            .config("spark.sql.shuffle.partitions", str(shuffle_partitions))
        
            # 콘솔 진행률 표시 (테스트 시 로그 단순화 목적)
            .config("spark.ui.showConsoleProgress", "false")
        
            # 필요한 라이브러리를 자동 다운로드하도록 지정
            # Iceberg 및 S3 연동에 필요한 JAR 패키지 목록
            .config(
                "spark.jars.packages",
                ",".join([
                    f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}",  # Spark 3.5 + Scala 2.12 호환 Iceberg 런타임
                    f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",              # S3FileIO, Glue 등 AWS 관련 기능 포함
                    "org.apache.hadoop:hadoop-aws:3.3.4",                                   # S3A 파일시스템 지원
                ])
            )
            
            # S3a 설정(MinIO 파일 읽기용)
            .config(
                "spark.hadoop.fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
            ) # Spark 인증 객체 설정
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.hadoop.fs.s3a.endpoint", endpoint)                 # ex) http://10.10.10.10:9000
            .config("spark.hadoop.fs.s3a.path.style.access", "true")          # MinIO 필수
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")    # HTTP이면 false
            .config("spark.hadoop.fs.s3a.access.key", access_key)
            .config("spark.hadoop.fs.s3a.secret.key", secret_key)
        
            # Iceberg Catalog 
            ## catalog 설정
            .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
            ## Catalog 구현체: HadoopCatalog (메타데이터를 파일로 관리)
            .config("spark.sql.catalog.local.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog")
        
            ## Iceberg warehouse 경로 (데이터가 저장될 루트 S3 경로)
            .config("spark.sql.catalog.local.warehouse", "s3a://iceberg/warehouse/")
        
            ## S3 입출력 구현체 지정 (AWS S3 또는 MinIO 호환)
            .config("spark.sql.catalog.local.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
            
            ## S3(MinIO) 접속 정보
            .config("spark.sql.catalog.local.s3.endpoint", endpoint)                # MinIO endpoint (예: http://10.10.10.10:9000)
            .config("spark.sql.catalog.local.s3.path-style-access", "true")         # MinIO는 path-style 접근 필요
            .config("spark.sql.catalog.local.s3.region", region)                    # 기본적으로 us-east-1
            .config("spark.sql.catalog.local.s3.access-key-id", access_key)         # Access key
            .config("spark.sql.catalog.local.s3.secret-access-key", secret_key)     # Secret key
            .config("spark.sql.catalog.local.client.region", region)
        )
        
        spark = builder.getOrCreate()

         

        파일 읽기

        df = spark.read.csv(
            's3a://develop/sample/sample.csv',
            header=True,
            inferSchema=True,
            sep=",",
        )
        
        df.show()

         

        show로 보인 데이터프레임 일부

        테이블 생성

        session에서 설정한 local 카탈로그에 people.contacts 테이블을 생성한다.

        여기에선 format-version 2를 사용했다.

        spark.sql("CREATE NAMESPACE IF NOT EXISTS local.people")
        spark.sql("""
            CREATE TABLE IF NOT EXISTS local.people.contacts (
            name STRING,
            address STRING,
            phone_number STRING,
            post_code STRING,
            email STRING,
            job STRING,
            company STRING,
            country STRING
        )
        USING iceberg
        TBLPROPERTIES ('format-version' = '2')
        """)

         

        데이터 삽입

        데이터프레임에 있는 데이터를 테이블에 추가한다

        df.writeTo("local.people.contacts").append()

         

        테이블 확인

        read.table로 테이블을 읽어 데이터프레임으로 받고, show로 데이터를 확인한다

        result = spark.read.table("local.people.contacts")
        result.show(5, truncate=False)

        세션 종료

        spark.stop()

        결과 확인

        MinIO의 데이터 확인

        만들어놓은 bucket에서 파일들을 확인한다.

        metadata들
        데이터 파일

         

        다음글
        다음 글이 없습니다.
        이전글
        이전 글이 없습니다.
        댓글
      조회된 결과가 없습니다.
      스킨 업데이트 안내
      현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
      ("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)
      목차
      표시할 목차가 없습니다.
        • 안녕하세요
        • 감사해요
        • 잘있어요

        티스토리툴바