APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

Databricksで自分の無料RAGチャットボットを作ってみましょう-!

はじめに

エーピーコミュニケーションズGDAI事業部Lakehouse部の鄭(ジョン)です。

この記事では、DatabricksのFree Editionを利用してRAGチャットボットを作成する方法をご紹介します。 今回のチャットボットは、Unity Catalogに保存されているテーブルを利用してRAGを構築し、Databricks Apps上でデプロイしました。 Free Editionでは、Databricksに内蔵されているモデルを利用して、無料でチャットボットを作成することができます。 ただし、機密性の高いデータを扱う場合には注意が必要であり、バックアップをあらかじめ取っておくことをおすすめします。

目次

RAG構築

今回は、RAG構築ステップを簡単にするため、DatabricksのVector Searchを利用せず、ベクトルDBとしてChromaDBを使用しています。

  • ChromaDBの場合は、chroma_client.get_or_create_collection()を利用してapp.pyの中でインデックスの生成と検索をそのまま利用します。
  • Databricks Vector Searchの場合は、embeddingテーブルを準備したのと同じように、別のノートブックで VectorSearchClient().create_delta_sync_index()を実行してインデックスを作成し、その後 app.pyで VectorSearchClient().get_index().similarity_search()を利用して検索を行います。

ChromaDBはインメモリ型のDBとして、アプリケーション構築のパートで定義されます。
DatabricksのVector Searchを利用したい方は、以下のブログをご参照ください。

qiita.com

RAGに使用するデータをUnity Catalogに保存します。 私は、Databricks公式ドキュメントをスクレイピングしたデータを利用しました。 エンベディングには英語に強い「databricks-bge-large-en」モデルを使用し、requests.postを通じてモデルを呼び出し、データをベクトル化しました。

エンベディングモデルに関する詳細は、以下のリンクをご参照ください。

docs.databricks.com

import os
import requests
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate()

UC_TABLE = "apps_data.dbx_doc.databricks_doc"
EMBEDDING_ENDPOINT = "Modelのエンドポイントを入力してください。"
DATABRICKS_TOKEN = "トークンを入力してください。"

headers = {
    "Authorization": f"Bearer {DATABRICKS_TOKEN}",
    "Content-Type": "application/json"
}

# 1. UCテーブルをロード
df_spark = spark.table(UC_TABLE).select("id", "url", "content")  
df = df_spark.toPandas() 

# 2. embeddingを生成する関数
def get_embedding(text: str):
    resp = requests.post(
        EMBEDDING_ENDPOINT,
        headers=headers,
        json={"input": text}
    )
    resp.raise_for_status()
    return resp.json()["data"][0]["embedding"]

# 3. content → embedding に変換
df["embedding"] = df["content"].apply(get_embedding)

# 4. Pandas → Spark DF に変換後、UCテーブルにoverwrite保存
df_spark_new = spark.createDataFrame(df)
df_spark_new.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("apps_data.dbx_doc.databricks_doc")

print(f"✅ {UC_TABLE} テーブルのembeddingカラムを作成しました。。")

Application構築

次にDatabricks Appsを利用してChatbot Applicationを構築します。

AppsはComputeで作成できます。 ComputeのAppsタブで「Create app」ボタンをクリックします。

GradioChatbotテンプレートをクリックします。

希望するLLMリソースを選択します。私は「databricks-gpt-oss-120b」モデルを使用しました。

リソースを選択した後、「Install」ボタンを押すと、基本のLLMChatbotが作成されます。

Ragチャットボットに必要なリソースを追加するために「Edit」ボタンをクリックします。

  • Runningの横にあるURLをクリックするとデプロイされたチャットボットを開くことができます。
  • Deploymentの下にあるURLをクリックするとソースコードを開くことができます。

私は合計で3つのリソースを使用しました。

  1. AppsをInstallする際に選択したLLM用のモデルサービングエンドポイント
  2. ベクター検索のための埋め込みモデルサービングエンドポイント(RAGテーブルを作成する時と同じもの)
  3. Unity Catalogのテーブルを読み取る際に使用するSQL Warehouse

それぞれの権限とリソースキーは、画像のように登録しました。

App resourcesで登録されたリソースを確認できます。

Deploymentの下にあるURLをクリックして、ソースコードを確認および修正します。

Databricks AppsをInstallする際に自動的に生成された4つのファイルです。 なお、生成後に追加したリソースについては自動更新されないため、別途修正が必要です。

それぞれのファイルの用途は以下になります。

ファイル名 役割 / 用途 主な内容
app.py アプリケーション本体のコード Gradio の UI インターフェースを定義し、ユーザー入力を受け取り、モデルにリクエストを送り、応答を返す処理を行う。対話履歴の管理や補助ユーティリティの呼び出しも含まれる。
app.yaml 実行環境の設定ファイル Databricks Apps でアプリをどのように実行するかを定義。例:command(実行コマンド)、env(環境変数)、リソースや権限の設定など。
model_serving_utils.py モデル呼び出し用ユーティリティ Databricks のモデルサービング(エンドポイント)とやり取りする関数をまとめた補助モジュール。入力データのフォーマット変換、リクエスト送信、エラーハンドリング、応答の処理などを担当。
requirements.txt パッケージ依存関係の定義 アプリに必要な Python ライブラリを指定するファイル。例:gradio, requests, transformers など。アプリ実行時やデプロイ時にこのリストをもとにライブラリがインストールされる。

それぞれのファイルの中身について説明します。

- requirements.txt

今回の検証ではVector DBとしてChromaDBを使用するため、該当ライブラリをrequirements.txtファイルに追加してInstallします。

gradio==5.23.3
mlflow>=2.21.2
databricks-sdk
chromadb

- app.yaml

Appsに登録したリソースを環境変数に追加します。valueFromはApps設定で登録したリソースキーと一致させます。

command: [
  "python",
  "app.py"
]

env:
  - name: "SERVING_ENDPOINT_LLM"
    valueFrom: "llm"
  - name: "SERVING_ENDPOINT_EMB"
    valueFrom: "embedding"
  - name: "DATABRICKS_WAREHOUSE_ID"
    valueFrom: "sql-warehouse"

- model_serving_utils.py

Databricksモデルサービングエンドポイントを呼び出します。query_endpoint関数はDatabricks Appsの初期Install時に自動的に作成されました。LLMモデルのエンドポイントと連携されています。

from mlflow.deployments import get_deploy_client
from databricks.sdk import WorkspaceClient

def _get_endpoint_task_type(endpoint_name: str) -> str:
    """Get the task type of a serving endpoint."""
    w = WorkspaceClient()
    ep = w.serving_endpoints.get(endpoint_name)
    return ep.task

def is_endpoint_supported(endpoint_name: str) -> bool:
    """Check if the endpoint has a supported task type."""
    task_type = _get_endpoint_task_type(endpoint_name)
    supported_task_types = ["agent/v1/chat", "agent/v2/chat", "llm/v1/chat"]
    return task_type in supported_task_types

def _validate_endpoint_task_type(endpoint_name: str) -> None:
    """Validate that the endpoint has a supported task type."""
    if not is_endpoint_supported(endpoint_name):
        raise Exception(
            f"Detected unsupported endpoint type for this basic chatbot template. "
            f"This chatbot template only supports chat completions-compatible endpoints. "
            f"For a richer chatbot template with support for all conversational endpoints on Databricks, "
            f"see https://docs.databricks.com/aws/en/generative-ai/agent-framework/chat-app"
        )

def _query_endpoint(endpoint_name: str, messages: list[dict[str, str]], max_tokens) -> list[dict[str, str]]:
    """Calls a model serving endpoint."""
    _validate_endpoint_task_type(endpoint_name)
    
    res = get_deploy_client('databricks').predict(
        endpoint=endpoint_name,
        inputs={'messages': messages, "max_tokens": max_tokens},
    )
    if "messages" in res:
        return res["messages"]
    elif "choices" in res:
        return [res["choices"][0]["message"]]
    raise Exception("This app can only run against:"
                    "1) Databricks foundation model or external model endpoints with the chat task type (described in https://docs.databricks.com/aws/en/machine-learning/model-serving/score-foundation-models#chat-completion-model-query)"
                    "2) Databricks agent serving endpoints that implement the conversational agent schema documented "
                    "in https://docs.databricks.com/aws/en/generative-ai/agent-framework/author-agent")

def query_endpoint(endpoint_name, messages, max_tokens):
    return _query_endpoint(endpoint_name, messages, max_tokens)[-1]

query_embedding関数は、ベクター検索を行うために追加で作成した関数です。embeddingモデルのエンドポイントと連携されています。

def query_embedding(endpoint_name: str, message: str) -> list[float]:
    """Calls an embedding model serving endpoint."""
    res = get_deploy_client('databricks').predict(
        endpoint=endpoint_name,
        inputs={"input": message},
    )
    if "data" in res and "embedding" in res["data"][0]:
        return res["data"][0]["embedding"]
    raise Exception("This app can only run against Databricks embedding model endpoints")

- app.py

Chatbotアプリケーションを作成します。 まず、Gradio、Pandas、DatabricksSDKなど必要な外部ライブラリと、model_serving_utilsのような内部ユーティリティモジュールをインポートします。 次に、アプリ実行に必須となる環境変数(DATABRICKS_WAREHOUSE_ID、SERVING_ENDPOINT_EMB)が設定されているかを確認し、未設定の場合は実行を停止します。

import gradio as gr
import logging
import os
from model_serving_utils import query_endpoint, is_endpoint_supported, query_embedding
import chromadb
import pandas as pd
from databricks import sql
from databricks.sdk.core import Config

# Ensure environment variable is set correctly
assert os.getenv('DATABRICKS_WAREHOUSE_ID'), "DATABRICKS_WAREHOUSE_ID must be set in app.yaml."
assert os.getenv('SERVING_ENDPOINT_EMB'), "SERVING_ENDPOINT_EMB must be set in app.yaml."

ロギングを有効化します。そしてDatabricks SQL Warehouseに接続するためのsqlQuery関数を定義します。 この関数は入力されたSQLクエリを実行し、その結果をPandas DataFrameとして返します。 その後、環境変数 UC_TABLEに指定されたテーブル(apps_data.dbx_doc.databricks_doc)を参照し、データの形状とカラム情報を出力して確認します。 クエリ実行に失敗した場合は例外を出力し、空のDataFrameを返します。

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def sqlQuery(query: str) -> pd.DataFrame:
    """Execute a SQL query and return the result as a pandas DataFrame."""
    cfg = Config()  # Pull environment variables for auth
    with sql.connect(
        server_hostname=cfg.host,
        http_path=f"/sql/1.0/warehouses/{os.getenv('DATABRICKS_WAREHOUSE_ID')}",
        credentials_provider=lambda: cfg.authenticate
    ) as connection:
        with connection.cursor() as cursor:
            cursor.execute(query)
            return cursor.fetchall_arrow().to_pandas()

# Fetch the data
UC_TABLE = os.getenv("UC_TABLE", "apps_data.dbx_doc.databricks_doc")
try:
    # This example query depends on the nyctaxi data set in Unity Catalog, see https://docs.databricks.com/en/discover/databricks-datasets.html for details
    data = sqlQuery(f"SELECT * FROM {UC_TABLE}")
    print(f"Data shape: {data.shape}")
    print(f"Data columns: {data.columns}")
except Exception as e:
    print(f"An error occurred in querying data: {str(e)}")
    data = pd.DataFrame()

In-memoryのChromaDBクライアントを作成し、「rag_docs」というコレクションを準備します。 Databricks SQLから取得したデータからドキュメントID、メタデータ(URL)、本文テキスト、エンベディングを抽出し、ChromaDBコレクションに追加します。 これにより、後続のクエリ時にエンベディング類似度に基づいて関連ドキュメントを検索できるようになります。

# In-memory Chroma client
chroma_client = chromadb.Client()
collection = chroma_client.get_or_create_collection(name="rag_docs")

ids = data["id"].astype(str).tolist()
urls = data["url"].tolist()
docs = data["content"].tolist()
embs = data["embedding"].tolist() 

collection.add(
    ids=ids,
    documents=docs,
    embeddings=embs,
    metadatas=[{"url": u} for u in urls]
)

LLMとEmbeddingモデルの環境変数を読み込み、LLMモデルのエンドポイントがChat機能を提供しているかをチェックします。

# Ensure environment variable is set correctly
SERVING_ENDPOINT_LLM = os.getenv('SERVING_ENDPOINT_LLM')
assert SERVING_ENDPOINT_LLM,\
    ("Unable to determine serving endpoint to use for chatbot app. If developing locally, "
     "set the SERVING_ENDPOINT_LLM environment variable to the name of your serving endpoint. If "
     "deploying to a Databricks app, include a serving endpoint resource named "
     "'llm' with CAN_QUERY permissions, as described in "
     "https://docs.databricks.com/aws/en/generative-ai/agent-framework/chat-app#deploy-the-databricks-app")
# Check if the endpoint is supported
endpoint_supported_llm = is_endpoint_supported(SERVING_ENDPOINT_LLM)
SERVING_ENDPOINT_EMB = os.getenv('SERVING_ENDPOINT_EMB')

query_rag関数はユーザーの質問を受け取り、RAG(Retrieval-Augmented Generation)方式で処理します。 まず質問をエンベディングベクトルに変換し、ChromaDBで類似ドキュメントを検索して関連するコンテキストを作成します。 このコンテキストと既存の会話履歴を組み合わせてDatabricks LLMエンドポイントに渡し、モデルが生成した回答を返します。 この過程でエラーが発生した場合は、エラーメッセージを返します。

def query_rag(message, history):
    if not message.strip():
        return "ERROR: The question should not be empty"

    query_vector = query_embedding(SERVING_ENDPOINT_EMB, message)
    results = collection.query(query_embeddings=[query_vector], n_results=3)

    context = "\n".join([
        f"- {doc} (Sources: {meta['url']})"
        for doc, meta in zip(results["documents"][0], results["metadatas"][0])
    ])

    message_history = []
    for user_msg, assistant_msg in history:
        message_history.append({"role": "user", "content": user_msg})
        message_history.append({"role": "assistant", "content": assistant_msg})

    message_history.append({
        "role": "user",
        "content": f"Answer the following question using the context.\n\nContext:\n{context}\n\nQuestion: {message}"
    })

    try:
        logger.info(f"Sending request to model endpoint: {SERVING_ENDPOINT_LLM}")
        response = query_endpoint(
            endpoint_name=SERVING_ENDPOINT_LLM,
            messages=message_history,
            max_tokens=400
        )
        # return response["content"]
        print(message_history)
        print(response)
        return response["content"][-1]["text"]
    except Exception as e:
        logger.error(f"Error querying model: {str(e)}", exc_info=True)
        return f"Error: {str(e)}"

Gradioを利用してDatabricks LLMチャットボットのインターフェースを生成します。 もし指定されたLLMエンドポイントがサポートされていないタイプであれば、簡易UIを表示してエンドポイントが互換性を持たない旨の案内メッセージを示します。 正常なエンドポイントであれば、query_rag関数をベースに対話型インターフェースを構成し、チャットボットのタイトル・説明・サンプル質問を提供します。 最後にdemo.launch()を実行することでアプリケーションが起動し、Webブラウザ上で直接チャットボットを利用できるようになります。

# Create Gradio interface based on endpoint support
if not endpoint_supported_llm:
    # Create a simple interface showing the error message
    with gr.Blocks() as demo:
        gr.Markdown("# Databricks LLM Chatbot")
        gr.Markdown(
            f"""
            ⚠️ **Unsupported Endpoint Type**
            
            The endpoint `{SERVING_ENDPOINT_LLM}` is not compatible with this basic chatbot template.
            
            This template only supports chat completions-compatible endpoints.
            
            👉 **For a richer chatbot template** that supports all conversational endpoints on Databricks, 
            please see the [Databricks documentation](https://docs.databricks.com/aws/en/generative-ai/agent-framework/chat-app).
            """
        )
else:
    demo = gr.ChatInterface(
        fn=query_rag,
        title="Databricks LLM Chatbot",
        description=(
            "Note: this is a simple example. See "
            "[Databricks docs](https://docs.databricks.com/aws/en/generative-ai/agent-framework/chat-app) "
            "for a more comprehensive example, with support for streaming output and more."
        ),
        examples=[
            "What is machine learning?",
            "What are Large Language Models?",
            "What is Databricks?"
        ],
    )

if __name__ == "__main__":
    demo.launch()

Chatbot実行

ファイルの準備が完了したら「Deploy」ボタンをクリックしてAppsをデプロイします。

もしデプロイ中にエラーが発生した場合は、そのログを「Logs」で確認できます。

デプロイが完了すると、下の画像にある部分がUnavailableからRunningに変わります。

RunningにあるURLをクリックすると、チャットボットが開きます。

まとめ

この記事ではDatabricks Free EditionでApps機能を活用し、簡単なRAG Chatbotを実装してみました。 今回の検証では、Databricksに内蔵されたモデルを使用しましたが、任意のモデルをダウンロードしてサービングエンドポイントに登録すれば、チャットボットに活用することも可能です。 そして、GradioのUIを自由にカスタマイズすることで、自分だけのカスタムチャットボットを作成することもできます。 最後までお読みいただきありがとうございます。

※モデルサービング登録に関する記事は以下をご参照ください。

techblog.ap-com.co.jp

私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。
もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
APCにご興味がある方の連絡をお待ちしております。

www.ap-com.co.jp