APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

【Databricks活用方法LT】VSCodeでのDatabricks開発もお勧めしたい

はじめに

GLB事業部 Lakehouse部の阿部です。
先日、こちらのイベントでDatabricksの活用方法をテーマに発表しました。

findy.connpass.com

本記事では、イベントで発表したVisual Studio Code(VSCode)におけるDatabricksの開発方法をご紹介します。

ローカルでDatabricksを開発するには

そもそも普段、ワークスペースにブラウザからログインし、ワークフローや機械学習モデルの開発、BIダッシュボードの作成機能を使うと思います。
たとえば、ワークフロー開発は、notebookの開発とは別にJobやDLTの設定をUI上で行いますよね。
しかし、 VSCodeなどのIDEとDatabricksのクラスターを接続するDatabricks Connectというライブラリが使うことで、ワークスペースとの接続、コードの同期が可能です。
そのため、ローカルからのソースコード実行時は、ローカルのSparkセッションではなく、接続したDatabricksクラスターを用いてリモート実行しています。

VSCodeでの開発に必要な前準備

VSCodeでワークスペースと同期して開発する際に、以下2つの拡張機能を事前にインストール済みです。

Databricks extension for Visual Studio Code

VSCodeで開発したソースコードをワークスペースと同期し、VSCodeから実行できます。
ソースコードだけではなく、DLTやJobのワークフローにも対応しています。

注意 後述するDatabricks Asset Bundlesに対応したバージョン2は、プライベートプレビューになっているため、機能の変更がされる可能性があります。
本記事ではドキュメントに公開されている操作だけ実行しています。

Databricks Driver for SQLTools

VSCodeで開発したSQLクエリをワークスペースと同期できます。
さらに、VSCodeと接続したカタログ内のテーブルやViewなどのデータを確認でき、私の場合はローカルからDLTを実行後、この拡張機能によってターゲットのカタログにテーブルが作成されたか確認しています。

この拡張機能を使ったワークスペースとの接続、カタログ設定方法は以下ブログで記載しております。

https://techblog.ap-com.co.jp/entry/2023/09/05/071140

イベントで発表したこと

イベントでは、VSCode開発環境で開発したDLTパイプラインをローカルからワークスペースにデプロイ後、実行しました。
DLTのデプロイ、実行にはDatabricks Asset Bundlesというツールを使います。

Databricks Asset Bundlesとは

Databricksにおけるアセット(notebook、クラスター情報、ワークフロー構成など)をコード管理し、ワークスペースにデプロイするツールです。
具体的には、ローカルで用意したyamlファイルに書かれたDLTパイプラインのソースコードや設定情報を元にデプロイします。
アセットのコード管理ができるので、IaCアプローチによりソースコード管理ツールと組み合わせてバージョン管理が可能となっています。

今回使用したバンドルと関連ファイルは、以下のとおりです。

  • databricks.yml: ワークスペースのホスト名やDLTの設定情報を記載します。
    別のWSにデプロイしているDLTの設定情報をJSONからYAMLに変換し、resourcesフィールドに貼り付けています。
# This is a Databricks asset bundle definition for my_project.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.

# バンドル名
bundle:
  name: my_project

# resourcesディレクトリ内のYAMLファイルをインクルード
include:
  - resources/*.yml

# 事前に取得したDLTの設定(Json→Yamlに変換)
resources:
  pipelines:
    abe_test:
      id: "0e1755f6-742f-49ff-b98e-dc0556960f84"
      pipeline_type: "WORKSPACE"
      clusters:
        - label: "default"
          num_workers: 1
      development: true
      continuous: false
      channel: "CURRENT"
      photon: false
      libraries:
        - notebook: 
            path: "./src/DLT-Pipeline.ipynb"
      "name": "20240405_abe"
      "edition": "ADVANCED"
      "catalog": "ka_abe"
      "target": "sample"
      "data_sampling": false

# deploy先ワークスペース、DLTのmodeを設定
targets:
  # The 'dev' target, for development purposes. This target is the default.
  dev:
    # We use 'mode: development' to indicate this is a personal development copy:
    # - Deployed resources get prefixed with '[dev my_user_name]'
    # - Any job schedules and triggers are paused by default
    # - The 'development' mode is used for Delta Live Tables pipelines
    mode: development
    default: true
    workspace:
      host: https:から始まるワークスペースのホスト名
  • src/DLT-Pipeline.ipynb: DLTでの処理内容です。
    DLTに紐づくnotebookとしてローカルにエクスポートし、配置してください。
# Databricks notebook source
# This notebook shows how to use files in repos functionality in Delta Live Tables.

# COMMAND ----------

# import helper functions from the current repository
import helpers.columns_helpers as ch

# COMMAND ----------

import dlt

# COMMAND ----------

# get data path from the DLT Pipeline configuration so we can test the pipeline with smaller amount of data
default_json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
json_path = spark.conf.get("my_etl.data_path", default_json_path)
print(f"Loading data from {json_path}")

# COMMAND ----------

@dlt.table(
   comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():
  return spark.read.format("json").load(json_path)

# COMMAND ----------

@dlt.table(
  comment="Leave only links, and exclude some columns from dataset"
)
@dlt.expect_or_drop("only links", "type is not null and type in ('link', 'redlink')")
def clickstream_filtered():
  df = dlt.read("clickstream_raw")
  # use imported function
  new_cols = ch.columns_except(df, ['prev_id', 'prev_title'])
  return df.select(*new_cols)

Databricksの公式Blogに掲載されていた、以下のコードを引用しています。
https://github.com/alexott/dlt-files-in-repos-demo/blob/main/pipelines/DLT-Pipeline.py

  • resources/my_project_pipeline.yml: DLTの設定情報を記載します。
    今回はdatabricks.ymlに記載したため不要です。

これらのファイルはすべて同じディレクトリ上にあり、databricks.ymlには参照先のDLTコードがあるディレクトリを指定します。

ちなみに事前にデフォルトのバンドルテンプレートを作成するコマンドを実行すると、バンドル用のディレクトリが自動作成されます。
詳しい方法は以下ドキュメントを参照してください。

https://docs.databricks.com/ja/dev-tools/bundles/work-tasks.html

Asset Bundleに対応した拡張機能を使ってDLTをデプロイ、実行してみた

ワークスペースとの認証後、サイドバーに表示がされます。
サイドバー上部にはdatabricks.ymlに記入したDLTの設定情報が書かれています。

サイドバー下部にはDABS RESOURCES EXPLORERが表示されています。
ここには、デプロイされるDLTの名前とステータス、ターゲットのカタログとスキーマが表示されます。

DABS上からDLTのデプロイと実行をします。
DLTにカーソルを合わせると再生マークが表示されるので、クリックするとデプロイと実行の両方やってくれます。(アップロードマークを押すと、デプロイだけ実行します)

実行完了後、ワークスペースに[dev ka_abe] 20240405_abeというDLTが作成され、実行済みであることが確認できました。

本来ならば、DLTのデプロイや実行はDatabricks CLIからコマンドを入力して行いますが、バージョン2の拡張機能を使うことで単純なUI操作で済ませられることがメリットです。(ちなみにログに実行したDatabricks CLIのコマンドが表示されているのでご安心を)

GitHub Copilot ChatはDatabricksでの開発に使えるのか

ワークスペースにはdatabricks assistantがあるのですが、ローカルだと当然使えません。
そこで、Copilot ChatはDatabricks特有のコードもサジェストしてもらえるか検証しました。

Copilotに以下の問い合わせをして実行しました。

以下の処理をDLTで定義してください。
・clickstream_filteredテーブルを読み込む 
・nカラムが100以下のレコードを削除する

出力結果(見やすさ重視のため分割して表示しています)

以下のDLTスクリプトは、指定された要件を満たします。
CREATE OR REPLACE PIPELINE clickstream_filtered_cleanup AS
LOAD DATA INTO TABLE clickstream_filtered
FROM (SELECT * FROM clickstream_filtered WHERE n > 100)
このスクリプトは、clickstream_filteredテーブルからデータを読み込み、nカラムの値が100以下のレコードを除外します。そして、その結果を再びclickstream_filteredテーブルにロードします。

専用のDLT構文をサジェストしてくれませんでした。

次に、エージェントを@workspaceに指定し、再実行します。
エージェントに@workspaceを指定することで、ワークスペース内のコードやファイルを参照して回答します。

出力結果(見やすさ重視のため分割して表示しています)

以下のDLT定義は、clickstream_filteredテーブルからデータを読み込み、nカラムの値が100以下のレコードを除外します。
@dlt.table(
    comment="Load data from clickstream_filtered and remove records where n is less than or equal to 100."
)
def clickstream_filtered_over_100():
    df = dlt.read("clickstream_filtered")
    return df.filter(df.n > 100)
この定義をsrc/DLT-Pipeline.ipynbに追加することで、新しいテーブルclickstream_filtered_over_100が作成され、その中にはnカラムの値が100より大きいレコードのみが含まれます。

専用のDLT構文をサジェストしてくれました。
src/DLT-Pipeline.ipynb内のコードを参照したため、Pythonでのコードが出力されたかと思います。

ちなみに、@workspaceを指定せずともDLT構文をサジェストする場合がありました。
これは恐らく、Copilotの会話履歴や開いているタブに影響を受けたかと思います。
サジェストの精度を上げる方法は本記事では言及しませんが、色々試してみると良いと思います。(私は面倒なので、エージェントに@workspaceを指定します)

以上の検証結果から、CopilotもDatabricksの開発に使えるといっても良さそうです。

おわりに

アーカイブ動画も公開されており、本記事での具体的な操作は動画からも確認できます。
VSCodeを使ってDatabricksの開発をされている方はまだまだ少ないと思いますので、本記事がそのヒントになれば幸いです。

最後までご覧いただきありがとうございます。
私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。
もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

そして、一緒に働いていただける仲間も募集中です!
Lakehouse部ではデータ&AI案件での開発及びコンサルティングを行うエンジニア/PMを募集しています。
他部署でもエンジニアを募集しておりますので、APCにご興味がある方はカジュアル面談か求人一覧からのご応募をお待ちしております。