APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

Databricks AppsとLakebaseを連携し、アプリケーションのトランザクションデータを管理する

はじめに

GDAI事業部 Lakehouse部の阿部です。 今年のDAISで発表されたLakebaseをあまり触れていなかったのですが、こちらのブログを読んでDatabricks Apps(以下、Appsと表記します)との連携が非常に容易であることを知りました。

https://www.databricks.com/blog/how-use-lakebase-transactional-data-layer-databricks-apps

最近AppsがAWSの東京リージョン、先月にはAzureの東日本リージョンでも使えるようになったのと、Appsとの連携例も見当たらなかったためブログを書きました。

Lakebaseとは

LakebaseはDatabricksが提供するOLTP(オンライントランザクション処理)向けのPostgreSQL互換のフルマネージドデータベースです。LakebaseはLakehouseと深く統合されているため、従来のETLパイプライン(たとえば、アプリケーションDB → ステージング → 変換 → DWHにロード)が不要になり、分析基盤との連携がシームレスになっています。つまり、LakebaseはDatabricks Appsとシームレスに連携でき、アプリケーションからのトランザクションデータを直接Lakebaseに出力・管理することが可能です。これにより、リアルタイムなデータ処理や柔軟なデータ活用が実現し、エンタープライズ用途にも適した信頼性・拡張性を備えています。

本記事では、Lakebaseのセットアップ方法やDatabricks Appsとの連携例を通じて、実践的な活用方法を解説します。

lakebaseのセットアップ

まずはサイドバーのComputeから、Lakebase PostgresタブからLakebase(インスタンス)を作成します。

これからUIでの操作画面をベースに説明しますが、以下のようにLakebaseの作成等はpythonで実行できますし、CLI, curlコマンドでも実行できます。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import DatabaseInstance

# Initialize the Workspace client
w = WorkspaceClient()

# Create a database instance
instance = w.database.create_database_instance(
    DatabaseInstance(
        name="my-database-instance",
        capacity="CU_1"
    )
)

print(f"Created database instance: {instance.name}")
print(f"Connection endpoint: {instance.read_write_dns}")

Restore windowを設定することで、ポイントインタイムリカバリが可能です。 また、既存の親インスタンスから子インスタンスを作成したり、フェイルオーバーノードを追加して可用性を担保することもできます。

Databricks appsの作成

同じくサイドバーのComputeからAppsの画面を開いたところ、LakebaseとAppsを統合するためのテンプレートがすでに用意されていたので、今回はLakebase Postgres appを選択しました。

DBは既定のものを選択。

テンプレートのため設定済みですが、AppsにはDBに対するCONNECTとCREATEの権限を付与の必要があります。

Appsを起動すると、以下のrequirements.txt、app.yml、app.pyが用意されます。

requirements.txt

flask>=2.3.0
psycopg[binary,pool]>=3.1.0
databricks-sdk>=0.18.0 

app.yml

command: ["python", "app.py"]

app.pyのソースコードを確認し、アプリケーションの構成を確認しましょう。

app.py

from flask import Flask, render_template, request, redirect, url_for, flash
import psycopg
import os
import time
from databricks import sdk
from psycopg import sql
from psycopg_pool import ConnectionPool

# Database connection setup
workspace_client = sdk.WorkspaceClient()
postgres_password = None
last_password_refresh = 0
connection_pool = None

# Databricks AppsがLakebase(PostgreSQL互換DB)に接続する際に、必要なOAuthトークン(認証情報)を定期的に更新する
def refresh_oauth_token():
    """Refresh OAuth token if expired."""
    global postgres_password, last_password_refresh
    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        try:
            postgres_password = workspace_client.config.oauth_token().access_token
            last_password_refresh = time.time()
        except Exception as e:
            print(f"❌ Failed to refresh OAuth token: {str(e)}")
            return False
    return True

# Lakebaseへの接続プールが未生成の場合、新規に取得・生成する
def get_connection_pool():
    """Get or create the connection pool."""
    global connection_pool
    if connection_pool is None:
        refresh_oauth_token()
        conn_string = (
            f"dbname={os.getenv('PGDATABASE')} "
            f"user={os.getenv('PGUSER')} "
            f"password={postgres_password} "
            f"host={os.getenv('PGHOST')} "
            f"port={os.getenv('PGPORT')} "
            f"sslmode={os.getenv('PGSSLMODE', 'require')} "
            f"application_name={os.getenv('PGAPPNAME')}"
        )
        connection_pool = ConnectionPool(conn_string, min_size=2, max_size=10)
    return connection_pool

# 最新の認証情報で接続プールを作り直し、そのプールから新しいDB接続を返す
def get_connection():
    """Get a connection from the pool."""
    global connection_pool
    
    # Recreate pool if token expired
    if postgres_password is None or time.time() - last_password_refresh > 900:
        if connection_pool:
            connection_pool.close()
            connection_pool = None
    
    return get_connection_pool().connection()

def get_schema_name():
    """Get the schema name in the format {PGAPPNAME}_schema_{PGUSER}."""
    pgappname = os.getenv("PGAPPNAME", "my_app")
    pguser = os.getenv("PGUSER", "").replace('-', '')
    return f"{pgappname}_schema_{pguser}"

def init_database():
    """Initialize database schema and table."""
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                schema_name = get_schema_name()
                
                cur.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(schema_name)))
                cur.execute(sql.SQL("""
                    CREATE TABLE IF NOT EXISTS {}.todos (
                        id SERIAL PRIMARY KEY,
                        task TEXT NOT NULL,
                        completed BOOLEAN DEFAULT FALSE,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                """).format(sql.Identifier(schema_name)))
                conn.commit()
                return True
    except Exception as e:
        print(f"Database initialization error: {e}")
        return False

def add_todo(task):
    """Add a new todo item."""
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                schema = get_schema_name()
                cur.execute(sql.SQL("INSERT INTO {}.todos (task) VALUES (%s)").format(sql.Identifier(schema)), (task.strip(),))
                conn.commit()
                return True
    except Exception as e:
        print(f"Add todo error: {e}")
        return False

def get_todos():
    """Get all todo items."""
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                schema = get_schema_name()
                cur.execute(sql.SQL("SELECT id, task, completed, created_at FROM {}.todos ORDER BY created_at DESC").format(sql.Identifier(schema)))
                return cur.fetchall()
    except Exception as e:
        print(f"Get todos error: {e}")
        return []

def toggle_todo(todo_id):
    """Toggle the completed status of a todo item."""
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                schema = get_schema_name()
                cur.execute(sql.SQL("UPDATE {}.todos SET completed = NOT completed WHERE id = %s").format(sql.Identifier(schema)), (todo_id,))
                conn.commit()
                return True
    except Exception as e:
        print(f"Toggle todo error: {e}")
        return False

def delete_todo(todo_id):
    """Delete a todo item."""
    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                schema = get_schema_name()
                cur.execute(sql.SQL("DELETE FROM {}.todos WHERE id = %s").format(sql.Identifier(schema)), (todo_id,))
                conn.commit()
                return True
    except Exception as e:
        print(f"Delete todo error: {e}")
        return False

# Initialize Flask app
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'dev-secret-key')

# Initialize database
if not init_database():
    print("Failed to initialize database")

@app.route('/')
def index():
    """Main page showing all todos."""
    todos = get_todos()
    return render_template('index.html', todos=todos)

@app.route('/add', methods=['POST'])
def add_todo_route():
    """Add a new todo item."""
    task = request.form.get('task', '').strip()
    if task:
        if add_todo(task):
            flash('Todo added successfully!', 'success')
        else:
            flash('Failed to add todo.', 'error')
    else:
        flash('Please enter a task.', 'error')
    return redirect(url_for('index'))

@app.route('/toggle/<int:todo_id>')
def toggle_todo_route(todo_id):
    """Toggle the completed status of a todo item."""
    if toggle_todo(todo_id):
        flash('Todo updated successfully!', 'success')
    else:
        flash('Failed to update todo.', 'error')
    return redirect(url_for('index'))

@app.route('/delete/<int:todo_id>')
def delete_todo_route(todo_id):
    """Delete a todo item."""
    if delete_todo(todo_id):
        flash('Todo deleted successfully!', 'success')
    else:
        flash('Failed to delete todo.', 'error')
    return redirect(url_for('index'))

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=int(os.getenv('PORT', 8080))) 

Todo Listを管理するアプリのようです。 Appsを起動します。

何回かタスクの追加や完了、削除などを試したので、これらの操作がLakebaseにトランザクションとして記録されているはずです。 LakebaseからNew Queryをクリックすると、SQL Editorが見えます。

以下のSQLを実行し、DBの中身を確認します。

select * from "databricks_postgres"."lakebase-postgres-app_schema_1ad8d6f950d64d4e9ba77105a8d1431d"."todos"

以下、実行結果です。

To list管理の操作がトランザクションとして正常に出力されています。

おまけ

Lakebaseの重要な機能ではあるのですが、ブログの都合上簡単な説明に留めます。

Unity Catalogに登録

LakebaseのデータベースをUnity Catalogに登録し、データの同期とガバナンスの強化も魅力です。

LakebaseのCatalogsからCreate Catalogボタンから、カタログを作成し、同期されていることも確認しました。

Lakebaseのモニタリング

Metricsタブから、Lakebaseの使用状況についてTransactions per secondなどのメトリックを観察できます。

おわりに

本記事では、DatabricksのLakebaseとDatabricks Appsの連携によるトランザクションデータ管理の最新事例を紹介しました。LakebaseはLakehouseアーキテクチャの思想を継承し、クラウドストレージ上で高性能なトランザクション処理と柔軟なデータ活用を実現します。

Databricks Appsと組み合わせることで、アプリケーションから直接Lakebaseにデータを出力・管理でき、リアルタイムなデータ処理やエンタープライズ用途にも対応可能です。セットアップや権限設定、アプリの実装例を通じて、Lakebaseの実践的な活用イメージを掴んでいただけたかと思います。

今回は簡単にLakebaseとAppsとの連携が容易であることを伝えるため、簡単な操作と説明でした。時間があれば別のアプリケーションでLakebaseのブランチ切り替えな機能などを試してみたいと思います。

私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援等()幅広く支援をしております。もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です! エーピーコミュニケーションズやLakehouse部にご興味がある方は問い合わせいただければ幸いです。

hrmos.co