データストリーミングの世界では、一方的にデータを追加し続ける「アペンドオンリー」処理は比較的扱いやすいものです。しかし、データソースに更新(UPDATE)や削除(DELETE)が含まれると、従来のストリーミングフレームワークはエラーを返したり、イベントを無視してしまったりしがちです。下流のテーブルが最新の状態を保てず、分析基盤の信頼性が損なわれるのです。
本記事では、米大手小売チェーンKroger社のデータサイエンス部門である84.51°が手がける「Better Together」アーキテクチャを例に、Delta LakeのChange Data Feed(CDF)、Delta Live Tables(DLT)、Apply Changes APIを組み合わせた、更新/削除を含むデータをほぼリアルタイムで正確に同期する手法をご紹介します。
背景:1日800万件を超える取引データの処理
84.51°が扱う取引データは、全米の店舗やウェブチャネルから毎日約800万件を超えます。注文の追加だけでなく変更やキャンセルといったイベントも多発するため、データパイプラインは常に上流システムの最新状態を反映し続ける必要があります。
従来のストリーミング処理における課題
Spark Structured StreamingなどでreadStream
を使う場合、INSERT以外のUPDATE/DELETEが来ると処理が止まるか、ignoreDeletes
/ignoreChanges
オプションでエラーを回避しても、変更イベントを文字どおり無視するだけで下流に反映できません。その結果、分析用テーブルは上流との不整合を抱えたままになり、正確な分析が困難になります。
解決の鍵:Delta Lake Change Data Feed(CDF)
Delta LakeのCDF機能を使うと、テーブル本体とは別にINSERT/UPDATE/DELETEの差分ログを保持できます。これを有効化したテーブルでは、「どの行がいつどのように変わったか」という変更履歴だけを効率的に抽出できるため、下流で必要なタイミングに差分情報を取り込むことが可能です。
DLTとApply Changes APIによる宣言的パイプライン
CDFで差分をキャプチャした後、それをターゲットテーブルにマージ(Upsert)するのがDLTとApply Changes APIの組み合わせです。DLTではパイプラインをSQLまたはPythonで宣言的に定義すると、チェックポイント管理やエラーハンドリング、スケーリングなどを自動で担保します。
Apply Changes APIの主要パラメータ例:
- keys:主キーとなる列を指定
- sequence_by:タイムスタンプやバージョン番号など、変更の順序を判断する列
- apply_as_deletes:DELETEイベントをどの条件で物理削除として扱うか
- stored_as_scd_type:SCD Type 1/2など、履歴管理方法を指定
これにより、従来複雑だったMERGE文の記述や順序乱れのハンドリングを内部で吸収しつつ、ストリーミングならではの堅牢なUpsert処理を実現できます。
全体像:Bronze→Silver→Goldの多層アーキテクチャ
84.51°の実装は、いわゆるメダリオンアーキテクチャにも類似した多層構成です。
- ソース
Azure Event Hubs(Kafka互換)などからリアルタイムで取引データをストリーミング取得。
- Bronzeレイヤー
生データをそのまま取り込み、必要最小限のスキーマ適用を行ってBronzeテーブルに保存。
- Silverレイヤー
Bronzeテーブルのデータに対し、Apply Changes APIを使って更新・削除を正しく反映しつつクレンジング/正規化。CDFもここで有効化し、差分ログを保持。
- Goldレイヤー
Silverで保持されたCDFをreadChangeFeed
などでストリーミング読み込みし、最終的な集計・分析用テーブルを構築。
この流れにより、上流システムのあらゆる変更が低遅延かつ正確に分析基盤まで伝播されます。
DELETEイベントの柔軟な扱い
Apply Changes APIでは、DELETEを物理削除するかソフトデリート的にフラグを立てるかをオプションで切り替えられます。DELETEイベントを無視するのではなく、対象レコードのステータス列を“deleted”に更新するといった運用も可能です。ビジネス要件に応じて最適な方式を選択しましょう。
導入の成果とまとめ
「Better Together」アプローチにより、更新・削除を含む大量のトランザクションをほぼリアルタイムで処理する堅牢なデータパイプラインを構築できました。もし更新/削除を伴うストリーミング処理で課題を抱えているなら、まずはDelta LakeでCDFを有効化し、DLTのApply Changes APIを試してみてください。宣言的設定で複雑な同期処理をシンプルに実現できるはずです。