OpenTableFormatを使ってデータレイクハウスを使ってみる
概要
S3 Tablesを筆頭に、マネージドなOTFサービスが出てきたので、Apache Icebergを触り理解を深めていきます。
環境準備
最低限データレイクハウスに必要な環境を構築します。以下の4つの観点で環境をまとめていきます。
- ストレージ
- コンピューティング
- ネットワーク
- データ
データレイクハウスでは、一般的なRDBMSと違いストレージとコンピューティングを分けて考えることができます。
ストレージ
ストレージにはAWS S3互換のオブジェクトストレージを無料の範囲で使います。AWS S3等で謳われているようなイレブンナイン(対故障率99.99999999999%)の耐久性を個人で実現するのは現実的ではないので、AWS S3を使いたいところですが、お財布に一抹の不安があるので、今回はOracleCloudInfrastructure(OCI) の Object Storageサービスを利用します。OCIではFreeTierで2024年時点で最大20GBの容量が無料で利用可能ですのでこれを使って、datalakehouseという名前空間をもったバケットを作成し、その中にiceberg階層、hudi階層、deltalake階層を作成します。なお、このバケットはパブリックに公開しないため、データのアクセスのためのAPIキーを取得済みであり、S3互換APIでバケットが操作可能になっている前提であるものとします。用語は以下で統一します。
- 顧客秘密キー:
SecretKey
- 顧客秘密キーアクセスキー:
AccessKey
- リージョン:
region
- S3互換API エンドポイント:
ApiEndpoint
フォーマット:https://{object-storage-namespace}.compat.objectstorage.{region}.oraclecloud.com
コンピューティング
今回はストレージにアクセス可能な適当なPCを使います。OSは特に問いませんが、今回はLinuxを使います。メモリは8GB
あればApacheSparkのJVMが動かせますが、度々発生するNullPointerExceptionに嫌気が差さなければ3GB
のメモリのマシンでも動かせます。なお、ディスクは40GB
の空きがあれば十分です。
ネットワーク
今回は日本国のネットワークから通信するので、OracleCloudInfrastructureでは日本国の大阪regionであるap-osaka-1
のアカウントを使います。FreeTierではアウトバウンド・データ転送: 1か月あたり10TB
の制限がありますのでこの制限に注意が必要です。ネットワーク伝送帯域幅はデータカタログ設計にもよりますが一般的なFTTHでOKです。
データ
無料の20GBの容量があるので、それに見合うデータをデータレイクハウスに用意する必要があります。今回は NSRPJP のデータをダウンロードして使用します。
NSRLJPの概要
NSRLJPはNSRLの日本ソフトウェア版です。本家NSRLに比べてデータが少なめなので、これを使います。本記事執筆時点で未圧縮で1GB程度のテキストファイルです。以下にテキストファイルの中身を紹介します。
-
NSRLFile.txt
"SHA-1","MD5","CRC32","FileName","FileSize","ProductCode","OpSystemCode","SpecialCode"
をカラムに持つCSVファイルです。NSRPJPのデータ本体であり、おおよそ550万レコードでおおよそ1GBのファイルです。主キーは
SHA-1
とMD5
による複合キーですが、衝突可能性的にはSHA-1
を主キーにし、MD5
はインデックスを張るだけにした方が良いです。ProductCode
、OpSystemCode
は外部キーとして使用します。 -
NSRLProd.txt
"ProductCode","ProductName","ProductVersion","OpSystemCode","MfgCode","Language","ApplicationType"
をカラムに持つCSVファイルです。
ProductCode
を主キーとするソフトウェアマスタです。 -
NSRLMfg.txt
"MfgCode","MfgName"
をカラムに持つCSVファイルです。
MfgCode
を主キーとするOSメーカーマスタです。 -
NSRLOS.txt
"OpSystemCode","OpSystemName","OpSystemVersion","MfgCode"
をカラムに持つCSVファイルです。
OpSystemCode
を主キーとするOSマスタです。
NSRLJPをduckdbでローカル確認
duckdbを使って、NSRLJPのデータを確認してみます。duckdbのCSV自動認識機能を使うため、まずはファイルの拡張子を置換します。
mv NSRLFile{.txt,.csv}
mv NSRLMfg{.txt,.csv}
mv NSRLOS{.txt,.csv}
mv NSRLProd{.txt,.csv}
duckdb NSRLJP.db
でデータベースを作成しスキーマを入力、データを読み込みます。
-- 1. メーカーマスタテーブルの作成
CREATE TABLE NSRLMfg (
MfgCode TEXT PRIMARY KEY,
MfgName TEXT NOT NULL
);
-- 2. OSマスタテーブルの作成
CREATE TABLE NSRLOS (
OpSystemCode TEXT PRIMARY KEY,
OpSystemName TEXT NOT NULL,
OpSystemVersion TEXT,
MfgCode TEXT,
);
-- 3. 製品マスタテーブルの作成
CREATE TABLE NSRLProd (
ProductCode TEXT PRIMARY KEY,
ProductName TEXT NOT NULL,
ProductVersion TEXT,
OpSystemCode TEXT,
MfgCode TEXT,
Language TEXT,
ApplicationType TEXT,
);
-- 4. NSRLファイル情報テーブルの作成
CREATE TABLE NSRLFile (
SHA1 TEXT PRIMARY KEY,
MD5 TEXT NOT NULL,
CRC32 TEXT NOT NULL,
FileName TEXT NOT NULL,
FileSize TEXT NOT NULL,
ProductCode TEXT,
OpSystemCode TEXT,
SpecialCode TEXT,
);
-- 5. インデックスの作成
CREATE INDEX idx_md5 ON NSRLFile (MD5);
-- 6. データのインポート
COPY NSRLMfg FROM 'NSRLMfg.csv';
COPY NSRLProd FROM 'NSRLProd.csv';
COPY NSRLOS FROM 'NSRLOS.csv';
COPY NSRLFile FROM 'NSRLFile.csv';
003f1b9a60927b2c429412cb4698907e0a0c68ce
のハッシュ値に関してduckdb NSRLJP.db
をクエリしてみます。
SELECT
*
FROM
NSRLFile
LEFT JOIN
NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
SHA1 = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
このクエリは、指定されたSHA1ハッシュ値003f1b9a60927b2c429412cb4698907e0a0c68ce
を持つファイルに関する情報を取得します。
SHA1 | MD5 | CRC32 | FileName | FileSize | ProductCode | OpSystemCode | SpecialCode | ProductCode | ProductName | ProductVersion | OpSystemCode | MfgCode | Language | ApplicationType | OpSystemCode | OpSystemName | OpSystemVersion | MfgCode | MfgCode | MfgName |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
003f1b9a60927b2c429412cb4698907e0a0c68ce | 220a7215a63faa737f49f2d5e6a6f8ca | 5E9769BF | Desktop.ini | 798 | 50047 | 1010 | 50047 | “Windows 10 x86” | “2015-“ | “1010” | “5001” | “Japanese” | “Operating System” | 1010 | Windows 10 | 10 | 5001 | 5001 | Microsoft |
ツール
今回、ミドルウェアにApacheSparkをPySparkで使います。ApacheSparkのJVMやPySparkのPythonインタプリタを使うことで統一的なインターフェースでデータを操作できます。
Warning
試すだけであればApacheSparkの利用を推奨しません。それぞれの公式がサポートしているiceberg-python https://github.com/apache/iceberg-pythonを使ってください。
-
関連ソフトウェアのインストール
-
uv, Python==3.11
curl -LsSf https://astral.sh/uv/install.sh | sh echo 'source $HOME/.local/bin/env' >> ~/.bashrc . $HOME/.bashrc
- OpenJDK==17
- ApacheSpark
3.5.5
の依存関係であるJava17を https://jdk.java.net/archive/ からDL,インストールし、JAVA_HOME環境変数に設定します。
# Linuxの場合 wget https://download.java.net/java/GA/jdk17/0d483333a00540d886896bac774ff48b/35/GPL/openjdk-17_linux-x64_bin.tar.gz tar xzvf openjdk-17_linux-x64_bin.tar.gz export JAVA_HOME=`pwd`/jdk-17
- ApacheSpark
- PySpark==
3.5.5
- PythonでApacheSparkを使うライブラリをuvの仮想環境にインストールします。以後、この仮想環境で作業を進めていきます。
mkdir datalakehouse cd datalakehouse uv init --native-tls uv python --native-tls install 3.11 uv python --native-tls pin 3.11 echo "pyspark[sql]==3.5.5" > requirements.txt uv add --native-tls -r requirements.txt
-
ツールの依存関係については細心の注意が必要です!
ツール | バージョン | 詳細情報 |
---|---|---|
Apache Spark | 3.5.5 | 最新情報 |
Java (JVM) | 最新のJavaランタイム: Java 17。 | |
Python (PySpark) | 最新のPythonインタプリタ: Python 3.11。 | |
S3互換API & hadoop-aws | hadoop-awsを使用し、s3a://プロトコルを実施。最新JAR: hadoop-aws==3.3.4。 | |
Hadoop Libraries | 使用中のHadoopライブラリ: 3.3.4。 | |
Apache Iceberg | Iceberg Implementation | Icebergの実装はiceberg-spark-runtimeでパッケージ化。 |
Iceberg Spark Runtime | 最新バージョン: iceberg-spark-runtime-3.5_2.12==1.8.1。 | |
Scala Version | Sparkバージョン: 3.5、Scalaバージョン: 2.12。 | |
ライフサイクル表 | その他バージョンの組み合わせはCurrent Engine Version Lifecycle Statusに従ってください。 |
環境構築
-
バケット構成
datalakehouse <- bucket ├── iceberg/ <- warehouse │ └── NSRLJP/ <- database │ └── NSRLMfg/ <- table │ └── NSRLProd/ │ └── NSRLOS/ │ └── NSRLFile/ ├── .../ │ └── ... └── .../ └── ...
Warning
今回は単一のマシンでシングルスレッドでSQLを発行しますのでHadoopカタログというACIDトランザクションが担保されないカタログを使用しています。本番運用の場合Hadoopカタログは推奨されません。本来、ディレクトリとSQL(DML)の名前空間と階層構造をリンクさせる必要はありません。クラウド側のIAM/バケットポリシー設計次第でバケット構成はより複雑になるケースが殆どです。
ApacheIceberg
SparkSessionの作成
ApacheSparkに伝える為の変数spark
を作成します。
環境変数は以下のような設定となります。
- JAVA_HOME: ツールのインストールの際に設定済
- S3A_REGION:
ap-osaka-1
- S3A_ENDPOINT:
https://{object-storage-namespace}.compat.objectstorage.ap-osaka-1.oraclecloud.com
- S3A_ACCESS_KEY:
顧客秘密キー
- S3A_SECRET_KEY:
顧客秘密キーアクセスキー
#!/usr/bin/env python
# encoding: utf-8
import os
from pyspark.sql import SparkSession
CATALOG_NAME = "catalog_iceberg"
WAREHOUSE_URL = "s3a://datalakehouse/iceberg"
S3A_ENDPOINT = os.getenv('S3A_ENDPOINT')
S3A_ACCESS_KEY = os.getenv('S3A_ACCESS_KEY')
S3A_SECRET_KEY = os.getenv('S3A_SECRET_KEY')
S3A_REGION = os.getenv('S3A_REGION')
spark: SparkSession = (
SparkSession.builder.master("local[*,3]")
.appName("app_nsrljp")
# .config("spark.driver.memory", "2g")
# .config("spark.executor.memory", "2g")
# .config("spark.log.level", "DEBUG")
.config(
"spark.jars.packages",
f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1"
f",org.apache.hadoop:hadoop-aws:3.3.4",
)
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(
f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
)
.config(
f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop"
) # https://iceberg.apache.org/docs/1.5.0/spark-configuration/
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}")
.config(f"spark.sql.defaultCatalog", f"{CATALOG_NAME}")
.config("spark.hadoop.fs.s3a.endpoint.region", S3A_REGION or "")
.config("spark.hadoop.fs.s3a.endpoint", S3A_ENDPOINT or "")
.config("spark.hadoop.fs.s3a.access.key", S3A_ACCESS_KEY or "")
.config("spark.hadoop.fs.s3a.secret.key", S3A_SECRET_KEY or "")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.path.style.access", True)
.getOrCreate()
)
DATABASE_NAME = "NSRLJP"
データの書き込み
NSRLJPのCSVファイルをインポートします。サイズが大きなCSVファイルは自動で複数のparquetファイルに分割されて保存されます。
spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg PURGE""")
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg (
MfgCode STRING,
MfgName STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLMfg/'
"""
)
spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS PURGE""")
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS (
OpSystemCode STRING,
OpSystemName STRING,
OpSystemVersion STRING,
MfgCode STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLOS/'
"""
)
spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd PURGE""")
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd (
ProductCode STRING,
ProductName STRING,
ProductVersion STRING,
OpSystemCode STRING,
MfgCode STRING,
Language STRING,
ApplicationType STRING
) USING ICEBERG
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLProd/'
"""
)
spark.sql(f"""DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile PURGE""")
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile (
`SHA-1` STRING,
MD5 STRING,
CRC32 STRING,
FileName STRING,
FileSize STRING,
ProductCode STRING,
OpSystemCode STRING,
SpecialCode STRING
) USING ICEBERG
PARTITIONED BY (OpSystemCode)
LOCATION '{WAREHOUSE_URL}/{DATABASE_NAME}/NSRLFile/'
"""
)
spark.read.option("inferSchema", True).option("header", True).csv(
"NSRLMfg.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg").overwritePartitions()
spark.read.option("inferSchema", True).option("header", True).csv("NSRLOS.csv").writeTo(
f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLOS"
).overwritePartitions()
spark.read.option("inferSchema", True).option("header", True).csv(
"NSRLProd.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLProd").overwritePartitions()
spark.read.option("inferSchema", True).option("header", True).csv(
"NSRLFile.csv"
).writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile").overwritePartitions()
データの読み込み
# 通常のクエリ
spark.sql(f"""
SELECT
*
FROM
{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
`SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
"""
).show()
# パーティション(フォルダ)による最適化が走るクエリ
spark.sql(
f"""
SELECT
*
FROM
{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLProd ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.ProductCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLProd.ProductCode
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLOS ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.OpSystemCode
LEFT JOIN
{CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg ON {CATALOG_NAME}.{DATABASE_NAME}.NSRLOS.MfgCode = {CATALOG_NAME}.{DATABASE_NAME}.NSRLMfg.MfgCode
WHERE
{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.`SHA-1` = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
{CATALOG_NAME}.{DATABASE_NAME}.NSRLFile.OpSystemCode = '1010'
"""
).show()
データの読み込み(DuckDB)
# Extensionsをロード
import duckdb
duckdb.sql("""
INSTALL httpfs;
INSTALL iceberg;
LOAD iceberg;
LOAD httpfs;
""")
# S3用クレデンシャルを作成
duckdb.sql(f"""
DROP SECRET IF EXISTS oci_storage;
CREATE SECRET IF NOT EXISTS oci_storage (
TYPE s3,
REGION '{S3A_REGION}',
ENDPOINT '{S3A_ENDPOINT}',
KEY_ID '{S3A_ACCESS_KEY}',
SECRET '{S3A_SECRET_KEY}',
URL_STYLE 'path'
);
""")
# 通常のクエリ
duckdb.sql("""
WITH NSRLFile AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
allow_moved_paths = true
)
), NSRLProd AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
allow_moved_paths = true
)
), NSRLOS AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
allow_moved_paths = true
)
), NSRLMfg AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
allow_moved_paths = true
)
)
SELECT
*
FROM
NSRLFile
LEFT JOIN
NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
""")
# パーティション(フォルダ)による最適化が走るクエリ
duckdb.sql("""
WITH NSRLFile AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLFile',
allow_moved_paths = true
)
), NSRLProd AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLProd',
allow_moved_paths = true
)
), NSRLOS AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLOS',
allow_moved_paths = true
)
), NSRLMfg AS (
SELECT
*
FROM
iceberg_scan(
's3://datalakehouse/iceberg/NSRLJP/NSRLMfg',
allow_moved_paths = true
)
)
SELECT
*
FROM
NSRLFile
LEFT JOIN
NSRLProd ON NSRLFile.ProductCode = NSRLProd.ProductCode
LEFT JOIN
NSRLOS ON NSRLFile.OpSystemCode = NSRLOS.OpSystemCode
LEFT JOIN
NSRLMfg ON NSRLOS.MfgCode = NSRLMfg.MfgCode
WHERE
NSRLFile."SHA-1" = '003f1b9a60927b2c429412cb4698907e0a0c68ce'
AND
NSRLFile.OpSystemCode = '1010'
""")