APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

Databricks-03. Deltaテーブルを操作してDelta Lakeを理解する

はじめに

GLB事業部Lakehouse部の阿部です。
データレイクは生データを蓄積するストレージとして利用されますが、データベースからデータレイクにデータを移行するとメタ情報が失われるため、データを貯めるだけのブラックボックスになるという課題がありました。
このような課題を解決したのがDelta Lakeです。
本記事では、Deltaテーブルを操作しながらDelta Lakeについて解説したいと思います。

Delta Lakeとは?

Delta Lakeについて簡単に説明します。 Delta Lakeはオープンソースソフトウェアであり、既存クラウドストレージにデータレイクハウスを構築するための技術です。
ACID特性を備えたデータベース操作(ACIDトランザクション)をサポートしており、信頼性と一貫性があるデータを提供します。
以下はACID特性についてです。

  • Atomicity(原子性): トランザクションが完全に実行される、もしくは実行されないことを意味し、処理が途中で失敗した場合は元の状態に戻します。
  • Consistency(一貫性): テーブルに対してあらかじめ定義されたルールに基づいてトランザクションを実行し、テーブルの整合性を保ちます。
  • Isolation(独立性): 複数のユーザーが同じテーブルに対してトランザクションを実行しても、別々のトランザクションとして分離され、お互いに干渉しません。
  • Durability(永続性): システム障害に耐えうる性質であり、正常にトランザクションが完了した場合には障害が発生してもデータが永遠に保存されます。

Databricksで作成されたテーブルのデフォルト形式はDeltaとなり、Deltaテーブルと呼びます。
Deltaの実体としては、parquetファイルのデータとjsonファイルのトランザクションログ(Delta Log)から構成されています。
parquetファイルはDeltaテーブルを作成する際に読み込まれ、Deltaの形式に変換されます。
Delta Logについては後述します。

Delta Lakeについて詳細を理解したい方は、Azureの公式ドキュメントを参照してみてください。

Delta Lakeとは

Deltaテーブルの操作

SQLを使用して Deltaテーブルを操作していきます。
ノートブックの新規作成は、ワークスペースのUsersから自分のユーザー名を選択し、ユーザー名上で右クリックして表示される操作一覧からノートブックを選択します。

作成したノートブックで主に使う言語はSQLであるため、Default LanguageのプルダウンからSQLを選択します。

下記のスキーマを持つwineというDeltaテーブルの作成から行います。

フィールド名 フィールド型
alcohol FLOAT
color STRING
volume FLOAT
delicious BOOLEAN

テーブルの作成

wineテーブルを作成します。

CREATE TABLE wine
(alcohol FLOAT, color STRING, volume FLOAT, delicious BOOLEAN)

テーブルが作成できました。

レコードの挿入

次に作成したDeltaテーブルにレコードを挿入します。

INSERT INTO wine VALUES
(15.1, 'red', 150, true),
(12.6, 'white', 180, true),
(10.9, 'amber', 40, false),
(16.1, 'yellow', 400, false);

SELECT * FROM wine

レコードの挿入が確認できました。

テーブルのプルダウンから参照したテーブルデータに対してCSV形式でのダウンロードや、ダッシュボードへの追加も可能です。

また、+タブをクリックするとビジュアライゼーションデータプロファイルが表示されます。

ビジュアライゼーションは、データのグラフ化を簡単に行える機能です。
wineのテーブルを色ごとの棒グラフを作成します。

データプロファイルを使うと、データの分布や基本統計量を表示できます。
ただし、表示に時間がかかるためデータ量が多い時は注意が必要です。

テーブルの更新

colorがamber(琥珀)のワインが美味しさはfalseになっていましたがtrueにします。
UPDATE + テーブル名コマンドでテーブルの更新を行い、SETコマンドでdeliciousカラムのfalseをtrueにします。

UPDATE wine
SET delicious = true
WHERE color = 'amber';

SELECT * FROM wine

レコードが更新されていることが確認できました。

レコードの削除

deliciousカラムがfalseになっているワインをレコードから削除したいと思います。

DELETE FROM wine
WHERE delicious = false;

SELECT * FROM wine

レコードが削除されていることが確認できました。

テーブルとビューを結合してレコードをアップサートする

ここで新しいビューを作成し、既存のwineテーブルと結合します。
その際にcolorが同じワインならvolumeを足し合わせ、既存のwineテーブルを更新します。
また、美味しいwineならそのレコードを挿入します。

まずはnew_wineというビューを作成します。

CREATE OR REPLACE TEMP VIEW new_wine(alcohol, color, volume, delicious) AS VALUES
(15.1, 'red', 200, true),
(20.4, 'rose', 40.0, true),
(10.6, 'orange', 58.5, true),
(18.9, 'black', 300.2, false);

SELECT * FROM new_wine

ビューが作成できました。
そして、WHEN句で条件を設けてwineテーブルと結合します。

MERGE INTO wine a
USING new_wine b
ON a.color = b.color
WHEN MATCHED THEN
  UPDATE SET volume = a.volume + b.volume
WHEN NOT MATCHED AND b.delicious = true THEN
  INSERT *;
SELECT * FROM new_wine

赤いワイン同士のvolumeを足し合わせることができました。

テーブルの削除

テーブルの削除は、DROP TABLE + テーブル名で実行します。

DROP TABLE wine

テーブルを削除しました。

Delta Logとは

ここからDelta Logを用いたテーブル操作をします。
Delta LogとはDelta Lakeのトランザクションログのことであり、テーブルに加えた変更(トランザクションログ)を時系列順に記録できます。
そのため原子性の保証、テーブルのメタデータ管理、タイムトラベル等が可能です。
タイムトラベルについては後述しますが、テーブルのバージョンを遡ることで以前のバージョンのテーブルを参照できる機能です。

本記事の後半では、Delta Logを確認してタイムトラベルの実行まで解説します。

DESCRIBEコマンドによるテーブル情報の取得

wineテーブルのDelta Logを確認する前に削除したテーブルを改めて作成し、これまでのトランザクションを合わせて実行します。
今回はidカラムを追加し、MERGEの条件をcolorの一致からidが一致するかどうかに変更しました。
また、テーブル作成時にカラムに対してコメントを付けられるため、alcoholカラムにコメントを付けました。

CREATE TABLE wine(
  id INT,
  alcohol FLOAT 
  COMMENT 'Low alcohol degree!',
  color STRING,
  volume FLOAT,
  delicious BOOLEAN);

INSERT INTO wine VALUES
  (1, 15.1, 'red', 150, true),
  (2, 12.6, 'white', 180, true),
  (3, 10.9, 'amber', 40, false),
  (4, 16.1, 'yellow', 400, false);

UPDATE wine 
SET delicious = true
WHERE color = 'amber';

DELETE FROM wine
WHERE delicious = false;

CREATE OR REPLACE TEMP VIEW new_wine(id, alcohol, color, volume, delicious) AS VALUES
(1, 15.1, 'red', 200, true),
(6, 20.4, 'rose', 40.0, true),
(7, 10.6, 'orange', 58.5, true),
(8, 18.9, 'black', 300.2, false);
  
MERGE INTO wine a
USING new_wine b
ON a.id = b.id
WHEN MATCHED THEN
  UPDATE SET volume = a.volume + b.volume
WHEN NOT MATCHED AND b.delicious = true THEN
  INSERT *;

DESCRIBE + テーブル名でテーブル情報を確認します。

DESCRIBE wine

各カラム毎のデータ型とコメントの内容が見えます。

あるカラムの情報だけ確認したい場合は、DESCRIBE + テーブル名 + データベース名.テーブル名.カラム名と記述します。

DESCRIBE wine default.wine.alcohol

alcoholカラムの情報を取得できました。

テーブルのメタ情報を確認したい場合は、DESCRIBE + EXTENDED + テーブル名と記述します。

DESCRIBE DETAIL wine

実行結果をダウンロードしたCSVファイルを示します。(UI上だとテーブルをスクロールする必要があり見づらいため)

テーブルのメタ情報を取得できました。
以下、各メタ情報についてです。

  • Catalog: カタログ名
  • Database: データベース名
  • Table: テーブル名
  • Type: テーブルの種類。テーブルの種類にはマネージドテーブルとアンマネージドテーブル(外部テーブル)があります。
  • Location: テーブルの保管場所
  • Provider: データフォーマット
  • Owner: テーブルのオーナー
  • Table Properties: テーブルのプロパティ。deltaテーブルはDelta Logによるバージョン管理がされており、読み込みと書き込みが可能なそれぞれの最小のバージョンが表示されています。

※マネージドテーブルとアンマネージドテーブルの違いは、簡単に言うとテーブルのデータとメタ情報をどちらも管理するかどうかです。
詳しくは、わかりやすく解説されている方の記事をこちらに紹介しますので、以下をご参照ください。

マネージドテーブルとアンマネージドテーブル

テーブルのメタ情報を確認するもう1つのコマンドは、DESCRIBE + DETAIL + テーブル名です。
表示範囲の関係上、実行結果の一部を以下に示します。

Locationなどが先ほどと同様に確認できます。

Delta Logを確認する

テーブルのDelta Logを確認する方法に、Databricksユーティリティー(dbutils)のファイルシステムユーティリティー(dbutils.fs)の使用が挙げられます。
wineテーブルのDelta Logを確認するため、dbutils.fslsコマンドを使用し、Databricksファイルシステム(DBFS)のディレクトリ一覧を取得します。

``dbutils```とDBFSの詳細については割愛するため、azureの公式ドキュメントを参照いただければと思います。

Databricks ユーティリティ

Databricks ファイル システム (DBFS) とは

Delta Logは、テーブルのディレクトリ以下の_delta_logディレクトリに格納されているファイルです。
以下のようにファイルパスを指定して表示できます。

%python
display(dbutils.fs.ls("dbfs:/user/hive/warehouse/wine/_delta_log"))

ノートブックを作成する際、デフォルト言語はSQLを選択していたため、Pythonコマンドを使う際には上記コードのように%pythonとMAGICコマンドを先頭に書く必要があります。
MAGICコマンドについては、下記記事にもまとめております。

Databricks-02. クラスターの作成からコードの実行まで - APC 技術ブログ

以下に実行結果をダウンロードしたCSVファイルを示します。

_delta_logディレクトリは、各トランザクションに対応した2種類のファイル(crc, json)から構成されています。
ファイルのインデックスは0から4まで振られているため、トランザクションは5回行われたということがわかります。

各ファイルについて説明します。
3, 4のチェックポイントファイルは、テーブルに対して10回のコミット毎に作成されるファイルであり、今回は5回のトランザクションであるため作成されません。
しかし、checkpointファイルの作成頻度は最適化されており、後述するRESTOREコマンドを実行した場合に作成されます。

  1. jsonファイル: トランザクションログはjsonファイルとして記録され、Deltaテーブル作成時に_delta_logディレクトリの中に自動作成される。

  2. crc (Cyclic Redundancy Check)ファイル:
    テーブルのバージョンの統計情報が記載されており、データの整合性を確認するために使用する。

  3. Checkpointファイル: parquetのcheckpointファイルにテーブルのバージョン情報が集計されるため、テーブルを参照する際にCheckpointファイル以降のDelta Logを読み込むだけでよくなり、計算コストを削減できます。
    過去のトランザクションログを一定期間終了後に削除する機能もあります。

  4. Last Checkpointファイル: 直近のチェックポイントを保存しており、最新のテーブルの状態を取得する際に計算コストを削減できます。

※__tmp_path_dirディレクトリは、データの一時的なバックアップや書込み処理中の一時ファイルが保存されます。
一時ファイルの保管先となるディレクトリのため、書き込み終了後はファイルが存在しません。

例として、最後のトランザクションであるMERGEに相当するDelta Logを確認するには、インデックスが4と振られたjsonファイルを参照すれば良いことがわかります。

SELECT * FROM json.`dbfs:/user/hive/warehouse/wine/_delta_log/00000000000000000004.json`

実行結果からadd列とremove列が確認できます。
remove列は、DELETEはもちろんUPDATE等によって元々のデータが削除された場合に削除されたファイルを記録します。
今回はUPDATEによるデータの更新が行われ、元々のデータが削除された形になります。
また、add列はデータが追加された場合や、削除後のデータを除いて再作成する場合に記録されます。

小さいファイルが生まれる理由とファイルレイアウト最適化について

Delta Lakeは、データであるparqetファイルと、トランザクションログ(Delta log)から構成されていることは冒頭でお話ししました。
parquetファイルはデータが追加される度に生成され、たとえば、INSERTやデータソースからのストリーミング、バッチ処理によるデータの出力が起きると、出力したデータの分ファイルが際限なく作成されます。
そのため、繰り返し作成されるParquetファイルを小さなファイルと表現しています。

Delta Tableからデータフレームを作成する際は、生成されたparquetファイルをすべて読み込みます。
そのため、小さいファイルが大量に生成されるとデータの読み込み効率、つまりクエリパフォーマンスが低下します。

このような小さなファイルをまとめて1つのファイルにし、ファイルレイアウトを最適化する方法にOPTIMIZEコマンドがあります。
パラメーターとしてZORDER BY + カラム名 をとることで、指定したカラム名(複数可)を用いてファイルレイアウトを最適化します。
ZORDER実行前後のファイルレイアウトのイメージを以下に示します。

(Delta 2.0 - The Foundation of your Data Lakehouse is Open | Delta LakeのSupport Z-Order clustering of data to reduce the amount of data readから一部引用)

すべてのファイルを読み込む際に、OPTIMIZE後のファイルレイアウトの方が読み込み効率が良さそうなことがわかります。

例として、wineテーブルに対してZORDERによるファイル最適化を実行します。
カラム名には、カーディナリティ(データの種類の数)が高いものを指定するため、例として顧客IDや製品IDが挙げられます。

OPTIMIZE wine
ZORDER BY id

今回は扱うデータが非常に小さいためOPTIMIZEの効果は実感できませんが、データが増えるほどクエリパフォーマンスの向上は顕著になります。

タイムトラベルで過去のテーブルの状態を復元する

タイムトラベルとは、データに加えられた変更をDelta Logが自動的にバージョン管理することで、以前のバージョンのデータを参照し元の状態のテーブルに復元できる機能です。
トランザクションの履歴を確認するために、DESCRIBE + HISTORY + テーブル名を実行します。

DESCRIBE HISTORY wine

各トランザクションの処理内容を表示しました。
wineテーブルの作成からOPTIMIZEまでの各トランザクションに対して、version管理されていることがわかります。

現在のテーブルのバージョンは5ですが、MERGEする前のバージョン3のテーブルを参照したい場合は、以下のように実行します。

SELECT 
  * 
FROM 
  wine 
VERSION AS OF 3

こちらのコードは参照しただけで、バージョン3のテーブルは復元しておりません。
テーブルを復元したい場合は、RESTORE TABLE + テーブル名 + TO VERSION AS OF + バージョン名 と記述します。

RESTORE TABLE wine 
TO VERSION AS OF 3

バージョンを指定してテーブルを復元する方法を紹介しましたが、delta logのtimestampを利用して復元する方法もご紹介します。
_delta_logディレクトリ以下のファイル作成時のtimestampを表示します。

%python
!ls -lt /dbfs/user/hive/warehouse/wine/_delta_log

先ほどなかったcheckpointファイルの作成が確認できます。
同様に、復元するテーブルのバージョンが3であるため、2023年3月13日の09:20時点でのDelta Logを参照すれば良いことがわかります。
バージョン名の代わりにtimestampで時刻を指定することで、その時点でのテーブルを復元できます。

RESTORE TABLE wine 
TO TIMESTAMP AS OF '2023-03-13 09:20:01'

以前のバージョンに復元できたか確認するため、wineテーブルを参照します。

SELECT 
  * 
FROM 
  wine

テーブルが復元できました。

参考記事

Delta 2.0 - The Foundation of your Data Lakehouse is Open

おわりに

本記事では、Delta Lakeについて理解するためにDeltaテーブルの操作と、Delta Logを用いたテーブルの履歴確認から復元まで触れました。
Delta Lakeの実体は、parquetファイルのデータとjsonファイルのトランザクションログ(Delta Log)から構成されており、 Delta LogによってACIDトランザクションのサポートや、テーブルの復元を可能にしています。
今後もDatabricksを用いた検証内容を投稿していきたいと思いますので、またご覧になっていただければと思います。

私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。
もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
APCにご興味がある方の連絡をお待ちしております。

www.ap-com.co.jp