APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

dbt × Databricksによるデータ処理と品質管理の最適化①【データ処理・モニタリング】

はじめに

GLB事業部Lakehouse部の長尾です。

これから始まる本ブログシリーズでは、Databricks上でのdbt活用に役立つ実践的なTipsをお届けします。
初めてdbtに触れる方から、既に使い慣れている方まで、幅広い層に向けた内容となっています。
また、11月には本シリーズに関連したウェビナーも予定していますので、ぜひご期待ください。
これからの投稿をお見逃しなく!

ウェビナーへのお申し込みはこちらからお願い致します。

本ブログは、Databricks社の「Best Practices for Super Powering Your dbt Project on Databricks」を参照して(2024年9月9日時点)、dbtとDatabricksを効果的に連携して活用するためのベストプラクティスについて紹介しています。

参考: dbt on Databricks: Training Series - Databricks

以下では今回の2本のブログで扱っているツールについて説明しています。

dbtとDatabricksを連携させることで、主に下記3つの観点からメリットを得られると考えています。


  1. データトランスフォーメーションのスケーラビリティの課題と効果
    課題: 大量のデータを効率的に処理できないこと。従来のデータ処理ツールでは、パフォーマンスが不足している。
    効果: Databricksのスケーラブルな処理能力とdbtのデータトランスフォーメーション機能を組み合わせることで、大規模なデータを高速かつ効率的に処理できる。

  2. データエンジニアリングのワークフロー自動化の課題と効果
    課題: 手動操作が多く、エラーが発生しやすい。再利用性の低いスクリプトが散在し、管理が煩雑。
    効果: dbtを使用することで、データトランスフォーメーションをコード化し、バージョン管理も可能となる。これにより、エラーが少なく、再利用可能なワークフローを実現できる。

  3. データ品質の保証とモニタリングの課題と効果
    課題: データ品質のチェックやモニタリングを手動で行うのは非効率であり、スケールしにくい。
    効果: dbtのテスト機能とDatabricksの処理基盤を活用することで、データパイプライン全体の品質を自動で検証・モニタリングできる。


本ブログでは、以下の2つの項目について説明しています。

本ブログの続編「dbt × Databricksによるデータ処理と品質管理の最適化②【継続な運用のための仕組み】」では、下記2つについて説明していますので、そちらも併せてぜひご一読ください!

  • dbt-project-evaluatorを使用してdbt best practiceと合致しているか確認する
  • SQLFluffとGitHub Actionsを使用して自動で標準化するための仕組みをつくる

dbt_artifactsを使用してdbtプロジェクトをモニタリングする

DatabricksとGitHubの接続のセットアップが完了した以下の状態から説明を開始します。 ※ dbtとDatabricksを接続させる方法については、本ブログでは説明していませんので、具体的な接続方法については以下のブログを参照ください。dbt CloudとDatabricksをDatabricks Partner Connectを使って接続し、dbtプロジェクト用のGitHubのリポジトリのセットアップも完了していることを前提に本ブログでは話を進めております。
Databricks-05. [Databricks × dbt]Partner Connectでdbtと接続する - APC 技術ブログ

Change branch(もしくはCreate branch)でbranchを指定します。 (検証用のブランチとしてmainとは別にapc-k-nagaoを本検証用として使用します。)

package.ymlファイルを作成します。jaffle-shop-classicフォルダーにカーソルを合わせると右側に「・・・」表示されます。それをクリックすると以下のように表示されるのでCreate fileボタンをクリックします。

packages.ymlと入力してCreateボタンを押してください。ここでは、階層がjaffle-shop-classic /packages.ymlとしておく必要があります。

作成したpackages.ymlファイルに以下のように入力しSaveします。

packages:
   - package: brooklyn-data/dbt_artifacts
     version: 2.2.1

dbt_project.ymlファイル(デフォルトですでに用意されている)を開き、コードを以下のようにアップデートします。

name: 'jaffle_shop'

config-version: 2
version: '0.1'

profile: 'jaffle_shop'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
    - "target"
    - "dbt_modules"
    - "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
  jaffle_shop:
      +materialized: table
      staging:
        +materialized: view

  dbt_artifacts:
    +database: dbtdbx
    +schema: dbt_artifacts
    staging:
      +database: dbtdbx
      +schema: dbt_artifacts_stg
sources:
  +database: dbtdbx
  +schema: dbt_artifacts_raw

on-run-end:
  - "{{ dbt_artifacts.upload_results(results) }}"

上記のようにアップデートできたら、SaveしてCommit and syncしておきます。

下記のon-run-endフックを使用することで、dbt CLI commandが終了するごとに、このマクロがコマンドの結果をDeltaテーブルにアップロードする処理を行います。

on-run-end:
  - "{{ dbt_artifacts.upload_results(results) }}"

次に、dbt CLIにdbt depsと入力しEnterを押してパッケージをインストールします。
dbt buildと入力した後に、Enterキーで実行します。
問題なければ以下のようにsuccessと表示されます。

System Logsをクリックすると、以下のようにdbt_artifactsが構築されデータも問題なく取り込まれていることをログで確認できます。

以下のように、Databricks上で、count、avg、min、maxを使って各モデルの実行回数や実行時間を可視化することで、負荷が高くなっているモデルや実行時間の急激な変動といったdbt projectのパフォーマンスをモニタリングするうえで有用です。

select
    node_id,
    count(*) model_runs,
    avg(total_node_runtime) avg_runtime,
    min(total_node_runtime) min_runtime,
    max(total_node_runtime) max_runtime
from dbtdbx.test_dbtdbx_dbt_artifacts.fct_dbt__model_executions
where node_id like '%jaffle_shop%'
group by node_id

また、以下のような形で、dbtコマンド実行ごとのpassed(成功)、failed(失敗)、skipped(スキップ)の数を集計しておくと、早期にデータやモデルなどの不具合を検知しやすくなります。これをモデルの品質管理やプロジェクト全体のパフォーマンスの安定性を評価する際に役立てることができます。

select
    inv.run_started_at,
    inv.project_name,
    inv.dbt_command,
    sum(if(exe.status = 'pass', 1, null)) as dbt_tests_passed,
    sum(if(exe.status = 'fail', 1, null)) as dbt_tests_failed,
    sum(if(exe.status = 'skipped', 1, null)) as dbt_tests_skipped
from dbtdbx.test_dbtdbx_dbt_artifacts.fct_dbt__test_executions exe
left join dbtdbx.test_dbtdbx_dbt_artifacts.fct_dbt__invocations inv
on exe.command_invocation_id = inv.command_invocation_id
group by 1, 2, 3


Liquid clusteringを使用してDeltaテーブルのパフォーマンスを最適化する

ここでは、DatabricksのLiquid Clusteringという機能について説明します。
Liquid Clusteringとは、端的にいうと、ファイル数とサイズでバランスの取れたデータセットになるようにデータを自動的かつ動的にクラスタリングして整理する機能です。 パーティション数を細かく管理する手間を軽減することができます。

詳細については以下を参照ください。
https://docs.databricks.com/ja/delta/clustering.html#what-is-liquid-clustering-used-for

以下のようにDatabricksのSQLエディターに記入しLiquid Clusteringを実行します。

alter table dbtdbx.test_dbtdbx.customers
set tblproperties ('delta.columnMapping.mode' = 'name');

alter table dbtdbx.test_dbtdbx.customers
cluster by (customer_id)

optimize dbtdbx.test_dbtdbx.customers;

'delta.columnMapping.mode' = 'name' は、Deltaテーブルのカラム管理を従来の位置ベースではなく、カラム名ベースで実施する機能を有効にします。 Liquid Clusteringは、カラムベースでデータをクラスタリングするため、columnMappingを使用します。

alter table dbtdbx.test_dbtdbx.customers cluster by (customer_id)は、customer_idカラムでクラスタリングを行います。このクラスタリングによって、customer_idを使ったクエリ(例:集計や結合)を高速化します。

optimize dbtdbx.test_dbtdbx.customersは、小さなファイルをまとめ、ストレージの効率を高めクエリパフォーマンスを向上させます。

describe detail dbtdbx.test_dbtdbx.customersを実行すると、以下の赤枠部分で、customer_idをキーとしてクラスタリングを実行できていることが確認できます。

DatabricksのQuery Historyから、以下のように、Liquid Clusteringの実施後にクエリの実行時間を短縮できていることが確認できます(実施前:8.67秒 -> 実施後:0.52秒)。

Liquid Clustering実施前
Liquid Clustering実施後


最後に

本ブログでは、dbtプロジェクトのモニタリング方法とLiquid Clusteringを使用したデータ処理の効率化方法に関する実装例について紹介しました。本ブログの続編(ブログ②)もご一読いただけますと幸いです!

2024年11月6日(水)に本シリーズに関連したウェビナーも開催します。詳細は以下URLから。

www.ap-com.co.jp


私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。
もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
APCにご興味がある方の連絡をお待ちしております。

※ APCは、Databricks Inc.の公式パートナーであり共同での内製化支援を実施しております。
また、dbt Labs, Inc. との販売パートナー契約を締結しており、dbtの販売と導入支援の提供が可能です。

www.ap-com.co.jp