株取引用のデータをナイーブにCSV + sqliteで管理していたが、独自実装がしんどいしパフォーマンスも良くないので代替手段を模索していた。
正月に取引モデルを組み直すのに合わせて高速に動くデータレイクを作り直して約2ヶ月経過したのでメモ。
ざっくりとした要件としては以下の通り。
- データセットをバルクで読み込むのが速い。
- SQLライクな文法でクエリできる。
- ローカルで計算を回すので自分の端末の中だけで完結する。
- 将来的にクラウド環境に比較的簡単に移行できるようにする。
軽く調査した後Icebergを採用することにした。
Iceberg自体は分析テーブル用のデータフォーマット規格で複数の実装が存在している。
今回は主にpyicebergを使い、データカタログにはsqliteを採用することでローカルで比較的高速に動作するデータレイクを構築した。
Icebergのざっくりとした理解
Spec - Apache Iceberg™
iceberg.apache.org
カタログ・メタデータレイヤ・データレイヤの3つの構造を導入することで高速な読み込みに加えACID特性などその他管理上便利な機能を実現するフォーマット。
データレイヤには文字通り実際のデータを格納する。メタデータレイヤは各データファイルが格納するデータの範囲や可視性の情報などを格納しScanする際に活用される。
これらのデータは追記型的に管理されていて、変更があった場合には新しくデータレイヤにデータファイルやマニフェストファイルを作成する。
カタログは最新のデータレイクの状態を構成するために必要なメタデータファイルへの参照を持っている。
書き込み操作をするときは必要なマニフェストやデータファイルを作った上でカタログが参照する先をアトミックに更新すれば、データレイク全体がアトミックに更新されたように見える。
カタログが少し前の状態を表すためのメタデータへの参照を持っていれば過去のデータレイクの状態を再現するタイムトラベル機能も実現できる。
上のような仕組みになっていることからデータレイヤ・メタデータレイヤに使うストレージは基本的に何でも良くローカルのファイルシステムからS3まで様々なものを使える。
カタログにはアトミックな更新をサポートするストレージを用意する必要があるが、SQLだけでなく複数のクラウドベンダがカタログ機能を提供しているので技術選定の選択肢としては広い。
実装したコードのかけら
実装的に難しいことは特にしていないくて、普通にpyicebergのドキュメントに従ってテーブルを作ってデータを挿入していくだけ。
テーブルの作成と挿入であれば以下のような数10行程度のコードで達成できる。
読み込みも
table.scanを呼ぶだけなので非常に簡単。def create_tables(catalog: Catalog) -> None:
# Stock table
schema = StockSchema.iceberg_schema()
catalog.create_table_if_not_exists(
const.TABLE_RAW_STOCK,
schema=schema,
partition_spec=PartitionSpec(
PartitionField(
source_id=get_field_id(schema.fields, "Code"),
transform=BucketTransform(BUCKET_SIZE),
field_id=1000,
name="code_partition_for_raw_stock",
)
),
)
def insert_stock_to_iceberg(
repository: Repository, catalog: Catalog, limit: int | None = None
) -> None:
t0 = time.perf_counter()
logger.info(f"start inserting files into the {const.TABLE_RAW_STOCK} iceberg table")
insert_iceberg(
repository,
catalog,
StockSchema,
const.TABLE_RAW_STOCK,
const.DIR_RAW_STOCK,
limit,
)
table = catalog.load_table(const.TABLE_RAW_STOCK)
t1 = time.perf_counter()
compact(table)
t2 = time.perf_counter()
logger.info(
"insert_stock_to_iceberg completed. insert time: %.2f sec, compaction time: %.2f sec",
t1 - t0,
t2 - t1,
)
実装自体は簡単だがパフォーマンスを出すためには多少の調整が必要だった。
株価データの分析をする際には銘柄単位で読み込むことが多いので当初銘柄コードでパーティションを切っていた。
ただそれだとデータファイルの数に対してメタデータファイル数が多くなりscanのプランニングに異常に時間がかかる状態になっていた。
それに加えてデイリーで日足の情報をデータレイクに追加していくが、これによって小さいデータファイル毎にメタデータファイルが生成されてしまい事態が悪化していく。
1レコードしか無いparquetファイルのようなものが大量に生成されて読み込みのパフォーマンスを大きく毀損する状態に陥っていた。
対策として以下2点を打って読み込みパフォーマンスは大きく改善した。
- コンパクション処理の実装。複数のデータファイルをまとめて1つの大きなデータファイルにすることでスキャンする必要のあるメタデータファイルを減らす。
- Partitionを銘柄コード毎に切るのではなく、BucketTransformを使って複数銘柄コードをまとめたPartitionを作成することでスキャンする必要のあるメタデータファイルを減らす。
最終的なパフォーマンス
前後比較があるわけではないがJQuantsの2013年以降の株価と決算情報のデータ読み込みは40秒程度で完了する。
まだ最適化の余地がありそうではあるが、全体の処理にかかる時間からすれば誤差のような時間かつ実装がシンプルなので現状満足している。