概要

S3 Tablesを筆頭に、マネージドなOTFサービスが出てきたので、Apache Icebergを触り理解を深めていきます。

Open In Colab

環境準備

最低限データレイクハウスに必要な環境を構築します。以下の4つの観点で環境をまとめていきます。

  • ストレージ
  • コンピューティング
  • ネットワーク
  • データ

データレイクハウスでは、一般的なRDBMSと違いストレージとコンピューティングを分けて考えることができます。

ストレージ

ストレージにはAWS S3互換のオブジェクトストレージを無料の範囲で使います。AWS S3等で謳われているようなイレブンナイン(対故障率99.99999999999%)の耐久性を個人で実現するのは現実的ではないので、AWS S3を使いたいところですが、お財布に一抹の不安があるので、今回はOracleCloudInfrastructure(OCI) の Object Storageサービスを利用します。OCIではFreeTierで2024年時点で最大20GBの容量が無料で利用可能ですのでこれを使って、datalakehouseという名前空間をもったバケットを作成し、その中にiceberg階層、hudi階層、deltalake階層を作成します。なお、このバケットはパブリックに公開しないため、データのアクセスのためのAPIキーを取得済みであり、S3互換APIでバケットが操作可能になっている前提であるものとします。用語は以下で統一します。

  • 顧客秘密キー: SecretKey
  • 顧客秘密キーアクセスキー: AccessKey
  • リージョン: region
  • S3互換API エンドポイント: ApiEndpoint フォーマット: https://{object-storage-namespace}.compat.objectstorage.{region}.oraclecloud.com

コンピューティング

今回はストレージにアクセス可能な適当なPCを使います。OSは特に問いませんが、今回はLinuxを使います。メモリは8GBあればApacheSparkのJVMが動かせますが、度々発生するNullPointerExceptionに嫌気が差さなければ3GBのメモリのマシンでも動かせます。なお、ディスクは40GBの空きがあれば十分です。

ネットワーク

今回は日本国のネットワークから通信するので、OracleCloudInfrastructureでは日本国の大阪regionであるap-osaka-1のアカウントを使います。FreeTierではアウトバウンド・データ転送: 1か月あたり10TBの制限がありますのでこの制限に注意が必要です。ネットワーク伝送帯域幅はデータカタログ設計にもよりますが一般的なFTTHでOKです。

データ

無料の20GBの容量があるので、それに見合うデータをデータレイクハウスに用意する必要があります。今回は NSRPJP のデータをダウンロードして使用します。

NSRLJPの概要

NSRLJPはNSRLの日本ソフトウェア版です。本家NSRLに比べてデータが少なめなので、これを使います。本記事執筆時点で未圧縮で1GB程度のテキストファイルです。以下にテキストファイルの中身を紹介します。

  • NSRLFile.txt

      "SHA-1","MD5","CRC32","FileName","FileSize","ProductCode","OpSystemCode","SpecialCode"
    

    をカラムに持つCSVファイルです。NSRPJPのデータ本体であり、おおよそ550万レコードでおおよそ1GBのファイルです。主キーはSHA-1MD5による複合キーですが、衝突可能性的にはSHA-1を主キーにし、MD5はインデックスを張るだけにした方が良いです。ProductCodeOpSystemCodeは外部キーとして使用します。

  • NSRLProd.txt

      "ProductCode","ProductName","ProductVersion","OpSystemCode","MfgCode","Language","ApplicationType"
    

    をカラムに持つCSVファイルです。ProductCodeを主キーとするソフトウェアマスタです。

  • NSRLMfg.txt

      "MfgCode","MfgName"
    

    をカラムに持つCSVファイルです。MfgCodeを主キーとするOSメーカーマスタです。

  • NSRLOS.txt

      "OpSystemCode","OpSystemName","OpSystemVersion","MfgCode"
    

    をカラムに持つCSVファイルです。OpSystemCodeを主キーとするOSマスタです。

NSRLJPをduckdbでローカル確認

duckdbを使って、NSRLJPのデータを確認してみます。duckdbのCSV自動認識機能を使うため、まずはファイルの拡張子を置換します。

mv NSRLFile{.txt,.csv}
mv NSRLMfg{.txt,.csv}
mv NSRLOS{.txt,.csv}
mv NSRLProd{.txt,.csv}

duckdb NSRLJP.db でデータベースを作成しスキーマを入力、データを読み込みます。

-- 1. メーカーマスタテーブルの作成
CREATE TABLE NSRLMfg (
    MfgCode TEXT PRIMARY KEY,
    MfgName TEXT NOT NULL
);

-- 2. OSマスタテーブルの作成
CREATE TABLE NSRLOS (
    OpSystemCode TEXT PRIMARY KEY,
    OpSystemName TEXT NOT NULL,
    OpSystemVersion TEXT,
    MfgCode TEXT,
);

-- 3. 製品マスタテーブルの作成
CREATE TABLE NSRLProd (
    ProductCode TEXT PRIMARY KEY,
    ProductName TEXT NOT NULL,
    ProductVersion TEXT,
    OpSystemCode TEXT,
    MfgCode TEXT,
    Language TEXT,
    ApplicationType TEXT,
);

-- 4. NSRLファイル情報テーブルの作成
CREATE TABLE NSRLFile (
    SHA1 TEXT PRIMARY KEY,
    MD5 TEXT NOT NULL,
    CRC32 TEXT NOT NULL,
    FileName TEXT NOT NULL,
    FileSize TEXT NOT NULL,
    ProductCode TEXT,
    OpSystemCode TEXT,
    SpecialCode TEXT,
);

-- 5. インデックスの作成
CREATE INDEX idx_md5 ON NSRLFile (MD5);

-- 6. データのインポート
COPY NSRLMfg FROM 'NSRLMfg.csv';
COPY NSRLProd FROM 'NSRLProd.csv';
COPY NSRLOS FROM 'NSRLOS.csv';
COPY NSRLFile FROM 'NSRLFile.csv';

003f1b9a60927b2c429412cb4698907e0a0c68ceのハッシュ値に関してduckdb NSRLJP.dbをクエリしてみます。

SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    SHA1 = '003f1b9a60927b2c429412cb4698907e0a0c68ce'

このクエリは、指定されたSHA1ハッシュ値003f1b9a60927b2c429412cb4698907e0a0c68ceを持つファイルに関する情報を取得します。

SHA1 MD5 CRC32 FileName FileSize ProductCode OpSystemCode SpecialCode ProductCode ProductName ProductVersion OpSystemCode MfgCode Language ApplicationType OpSystemCode OpSystemName OpSystemVersion MfgCode MfgCode MfgName
003f1b9a60927b2c429412cb4698907e0a0c68ce 220a7215a63faa737f49f2d5e6a6f8ca 5E9769BF Desktop.ini 798 50047 1010   50047 “Windows 10 x86” “2015-“ “1010” “5001” “Japanese” “Operating System” 1010 Windows 10 10 5001 5001 Microsoft

ツール

今回、ミドルウェアにApacheSparkPySparkで使います。ApacheSparkのJVMやPySparkのPythonインタプリタを使うことで統一的なインターフェースでデータを操作できます。

Warning

試すだけであればApacheSparkの利用を推奨しません。それぞれの公式がサポートしているiceberg-python https://github.com/apache/iceberg-pythonを使ってください。

  • 関連ソフトウェアのインストール

    • uv, Python==3.11

        curl -LsSf https://astral.sh/uv/install.sh | sh
        echo 'source $HOME/.local/bin/env' >> ~/.bashrc
        . $HOME/.bashrc
      
    • OpenJDK==17
      • ApacheSpark 3.5.5の依存関係であるJava17を https://jdk.java.net/archive/ からDL,インストールし、JAVA_HOME環境変数に設定します。
        # Linuxの場合
        wget https://download.java.net/java/GA/jdk17/0d483333a00540d886896bac774ff48b/35/GPL/openjdk-17_linux-x64_bin.tar.gz
        tar xzvf openjdk-17_linux-x64_bin.tar.gz
        export JAVA_HOME=`pwd`/jdk-17
      
    • PySpark==3.5.5
      • PythonでApacheSparkを使うライブラリをuvの仮想環境にインストールします。以後、この仮想環境で作業を進めていきます。
        mkdir datalakehouse
        cd datalakehouse
        uv init --native-tls
        uv python --native-tls install 3.11
        uv python --native-tls pin 3.11
        echo "pyspark[sql]==3.5.5" > requirements.txt
        uv add --native-tls -r requirements.txt
      

ツールの依存関係については細心の注意が必要です!

ツール バージョン 詳細情報
Apache Spark 3.5.5 最新情報
  Java (JVM) 最新のJavaランタイム: Java 17
  Python (PySpark) 最新のPythonインタプリタ: Python 3.11
  S3互換API & hadoop-aws hadoop-awsを使用し、s3a://プロトコルを実施。最新JAR: hadoop-aws==3.3.4
  Hadoop Libraries 使用中のHadoopライブラリ: 3.3.4
Apache Iceberg Iceberg Implementation Icebergの実装はiceberg-spark-runtimeでパッケージ化。
  Iceberg Spark Runtime 最新バージョン: iceberg-spark-runtime-3.5_2.12==1.8.1
  Scala Version Sparkバージョン: 3.5、Scalaバージョン: 2.12。
  ライフサイクル表 その他バージョンの組み合わせはCurrent Engine Version Lifecycle Statusに従ってください。

環境構築

  • バケット構成

      datalakehouse <- bucket
      ├── iceberg/ <- warehouse
         └── NSRLJP/ <- database
             └── NSRLMfg/ <- table
             └── NSRLProd/
             └── NSRLOS/
             └── NSRLFile/
      ├── .../
         └── ...
      └── .../
          └── ...
    

Warning

今回は単一のマシンでシングルスレッドでSQLを発行しますのでHadoopカタログというACIDトランザクションが担保されないカタログを使用しています。本番運用の場合Hadoopカタログは推奨されません。本来、ディレクトリとSQL(DML)の名前空間と階層構造をリンクさせる必要はありません。クラウド側のIAM/バケットポリシー設計次第でバケット構成はより複雑になるケースが殆どです。

ApacheIceberg

SparkSessionの作成

ApacheSparkに伝える為の変数sparkを作成します。 環境変数は以下のような設定となります。

  • JAVA_HOME: ツールのインストールの際に設定済
  • S3A_REGION: ap-osaka-1
  • S3A_ENDPOINT: https://{object-storage-namespace}.compat.objectstorage.ap-osaka-1.oraclecloud.com
  • S3A_ACCESS_KEY: 顧客秘密キー
  • S3A_SECRET_KEY: 顧客秘密キーアクセスキー
#!/usr/bin/env python
# encoding: utf-8

import os
from pyspark.sql import SparkSession

CATALOG_NAME = "catalog_iceberg"
WAREHOUSE_URL = "s3a://datalakehouse/iceberg"
S3A_ENDPOINT = os.getenv('S3A_ENDPOINT')
S3A_ACCESS_KEY = os.getenv('S3A_ACCESS_KEY')
S3A_SECRET_KEY = os.getenv('S3A_SECRET_KEY')
S3A_REGION = os.getenv('S3A_REGION')

spark: SparkSession = (
    SparkSession.builder.master("local[*,3]")
    .appName("app_nsrljp")
    # .config("spark.driver.memory", "2g")
    # .config("spark.executor.memory", "2g")
    # .config("spark.log.level", "DEBUG")
    .config(
        "spark.jars.packages",
        f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
        f",org.apache.hadoop:hadoop-aws:3.3.4",
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop"
    )  # https://iceberg.apache.org/docs/1.5.0/spark-configuration/
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}")
    .config(f"spark.sql.defaultCatalog", f"{CATALOG_NAME}")
    .config("spark.hadoop.fs.s3a.endpoint.region", S3A_REGION or "")
    .config("spark.hadoop.fs.s3a.endpoint", S3A_ENDPOINT or "")
    .config("spark.hadoop.fs.s3a.access.key", S3A_ACCESS_KEY or "")
    .config("spark.hadoop.fs.s3a.secret.key", S3A_SECRET_KEY or "")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .getOrCreate()
)
DATABASE_NAME = "NSRLJP"

データの書き込み

NSRLJPのCSVファイルをインポートします。サイズが大きなCSVファイルは自動で複数のparquetファイルに分割されて保存されます。

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg (
    MfgCode STRING,
    MfgName STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLMfg/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS (
    OpSystemCode STRING,
    OpSystemName STRING,
    OpSystemVersion STRING,
    MfgCode STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLOS/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd (
    ProductCode STRING,
    ProductName STRING,
    ProductVersion STRING,
    OpSystemCode STRING,
    MfgCode STRING,
    Language STRING,
    ApplicationType STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLProd/'
"""
)

spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile PURGE""")

spark.sql(
    f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile (
    `SHA-1` STRING,
    MD5 STRING,
    CRC32 STRING,
    FileName STRING,
    FileSize STRING,
    ProductCode STRING,
    OpSystemCode STRING,
    SpecialCode STRING
) USING ICEBERG
PARTITIONED BY (OpSystemCode)
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLFile/'
"""
)

spark.read.option("inferSchema", True).option("header", True).csv(
    "NSRLMfg.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg").overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv("NSRLOS.csv").writeTo(
    f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLOS"
).overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv(
    "NSRLProd.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLProd").overwritePartitions()

spark.read.option("inferSchema", True).option("header", True).csv(
    "NSRLFile.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile").overwritePartitions()

データの読み込み

# 通常のクエリ
spark.sql(f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
    `SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
"""
).show()
# パーティション(フォルダ)による最適化が走るクエリ
spark.sql(
    f"""
SELECT
    *
FROM
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.`SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
    {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = '1010'
"""
).show()

データの読み込み(DuckDB)

# Extensionsをロード
import duckdb
duckdb.sql("""
INSTALL httpfs;
INSTALL iceberg;
LOAD iceberg;
LOAD httpfs;
""")

# S3用クレデンシャルを作成

duckdb.sql(f"""
DROP SECRET IF EXISTS oci_storage;
CREATE SECRET IF NOT EXISTS  oci_storage (
    TYPE s3,
    REGION '{S3A_REGION}',
    ENDPOINT '{S3A_ENDPOINT}',
    KEY_ID '{S3A_ACCESS_KEY}',
    SECRET '{S3A_SECRET_KEY}',
    URL_STYLE 'path'
);
""")

# 通常のクエリ
duckdb.sql("""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            allow_moved_paths = true
        )
), NSRLProd AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
            allow_moved_paths = true
        )
), NSRLOS AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
            allow_moved_paths = true
        )
), NSRLMfg AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""")

# パーティション(フォルダ)による最適化が走るクエリ
duckdb.sql("""
WITH NSRLFile AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
            allow_moved_paths = true
        )
), NSRLProd AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
            allow_moved_paths = true
        )
), NSRLOS AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
            allow_moved_paths = true
        )
), NSRLMfg AS (
    SELECT
        *
    FROM
        iceberg_scan(
            's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
            allow_moved_paths = true
        )
)
SELECT
    *
FROM
    NSRLFile
LEFT JOIN
    NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
    NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
    NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
    NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
    NSRLFile.OpSystemCode = '1010'
""")