はじめに
GDAI事業部 Lakehouse部の阿部です。 今年のDAISで発表されたLakebaseをあまり触れていなかったのですが、こちらのブログを読んでDatabricks Apps(以下、Appsと表記します)との連携が非常に容易であることを知りました。
https://www.databricks.com/blog/how-use-lakebase-transactional-data-layer-databricks-apps
最近AppsがAWSの東京リージョン、先月にはAzureの東日本リージョンでも使えるようになったのと、Appsとの連携例も見当たらなかったためブログを書きました。
Lakebaseとは
LakebaseはDatabricksが提供するOLTP(オンライントランザクション処理)向けのPostgreSQL互換のフルマネージドデータベースです。LakebaseはLakehouseと深く統合されているため、従来のETLパイプライン(たとえば、アプリケーションDB → ステージング → 変換 → DWHにロード)が不要になり、分析基盤との連携がシームレスになっています。つまり、LakebaseはDatabricks Appsとシームレスに連携でき、アプリケーションからのトランザクションデータを直接Lakebaseに出力・管理することが可能です。これにより、リアルタイムなデータ処理や柔軟なデータ活用が実現し、エンタープライズ用途にも適した信頼性・拡張性を備えています。
本記事では、Lakebaseのセットアップ方法やDatabricks Appsとの連携例を通じて、実践的な活用方法を解説します。
lakebaseのセットアップ
まずはサイドバーのComputeから、Lakebase PostgresタブからLakebase(インスタンス)を作成します。
これからUIでの操作画面をベースに説明しますが、以下のようにLakebaseの作成等はpythonで実行できますし、CLI, curlコマンドでも実行できます。
from databricks.sdk import WorkspaceClient from databricks.sdk.service.database import DatabaseInstance # Initialize the Workspace client w = WorkspaceClient() # Create a database instance instance = w.database.create_database_instance( DatabaseInstance( name="my-database-instance", capacity="CU_1" ) ) print(f"Created database instance: {instance.name}") print(f"Connection endpoint: {instance.read_write_dns}")
Restore windowを設定することで、ポイントインタイムリカバリが可能です。 また、既存の親インスタンスから子インスタンスを作成したり、フェイルオーバーノードを追加して可用性を担保することもできます。
Databricks appsの作成
同じくサイドバーのComputeからAppsの画面を開いたところ、LakebaseとAppsを統合するためのテンプレートがすでに用意されていたので、今回はLakebase Postgres appを選択しました。
DBは既定のものを選択。
テンプレートのため設定済みですが、AppsにはDBに対するCONNECTとCREATEの権限を付与の必要があります。
Appsを起動すると、以下のrequirements.txt、app.yml、app.pyが用意されます。
requirements.txt
flask>=2.3.0 psycopg[binary,pool]>=3.1.0 databricks-sdk>=0.18.0
app.yml
command: ["python", "app.py"]
app.py
のソースコードを確認し、アプリケーションの構成を確認しましょう。
app.py
from flask import Flask, render_template, request, redirect, url_for, flash import psycopg import os import time from databricks import sdk from psycopg import sql from psycopg_pool import ConnectionPool # Database connection setup workspace_client = sdk.WorkspaceClient() postgres_password = None last_password_refresh = 0 connection_pool = None # Databricks AppsがLakebase(PostgreSQL互換DB)に接続する際に、必要なOAuthトークン(認証情報)を定期的に更新する def refresh_oauth_token(): """Refresh OAuth token if expired.""" global postgres_password, last_password_refresh if postgres_password is None or time.time() - last_password_refresh > 900: print("Refreshing PostgreSQL OAuth token") try: postgres_password = workspace_client.config.oauth_token().access_token last_password_refresh = time.time() except Exception as e: print(f"❌ Failed to refresh OAuth token: {str(e)}") return False return True # Lakebaseへの接続プールが未生成の場合、新規に取得・生成する def get_connection_pool(): """Get or create the connection pool.""" global connection_pool if connection_pool is None: refresh_oauth_token() conn_string = ( f"dbname={os.getenv('PGDATABASE')} " f"user={os.getenv('PGUSER')} " f"password={postgres_password} " f"host={os.getenv('PGHOST')} " f"port={os.getenv('PGPORT')} " f"sslmode={os.getenv('PGSSLMODE', 'require')} " f"application_name={os.getenv('PGAPPNAME')}" ) connection_pool = ConnectionPool(conn_string, min_size=2, max_size=10) return connection_pool # 最新の認証情報で接続プールを作り直し、そのプールから新しいDB接続を返す def get_connection(): """Get a connection from the pool.""" global connection_pool # Recreate pool if token expired if postgres_password is None or time.time() - last_password_refresh > 900: if connection_pool: connection_pool.close() connection_pool = None return get_connection_pool().connection() def get_schema_name(): """Get the schema name in the format {PGAPPNAME}_schema_{PGUSER}.""" pgappname = os.getenv("PGAPPNAME", "my_app") pguser = os.getenv("PGUSER", "").replace('-', '') return f"{pgappname}_schema_{pguser}" def init_database(): """Initialize database schema and table.""" try: with get_connection() as conn: with conn.cursor() as cur: schema_name = get_schema_name() cur.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(schema_name))) cur.execute(sql.SQL(""" CREATE TABLE IF NOT EXISTS {}.todos ( id SERIAL PRIMARY KEY, task TEXT NOT NULL, completed BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """).format(sql.Identifier(schema_name))) conn.commit() return True except Exception as e: print(f"Database initialization error: {e}") return False def add_todo(task): """Add a new todo item.""" try: with get_connection() as conn: with conn.cursor() as cur: schema = get_schema_name() cur.execute(sql.SQL("INSERT INTO {}.todos (task) VALUES (%s)").format(sql.Identifier(schema)), (task.strip(),)) conn.commit() return True except Exception as e: print(f"Add todo error: {e}") return False def get_todos(): """Get all todo items.""" try: with get_connection() as conn: with conn.cursor() as cur: schema = get_schema_name() cur.execute(sql.SQL("SELECT id, task, completed, created_at FROM {}.todos ORDER BY created_at DESC").format(sql.Identifier(schema))) return cur.fetchall() except Exception as e: print(f"Get todos error: {e}") return [] def toggle_todo(todo_id): """Toggle the completed status of a todo item.""" try: with get_connection() as conn: with conn.cursor() as cur: schema = get_schema_name() cur.execute(sql.SQL("UPDATE {}.todos SET completed = NOT completed WHERE id = %s").format(sql.Identifier(schema)), (todo_id,)) conn.commit() return True except Exception as e: print(f"Toggle todo error: {e}") return False def delete_todo(todo_id): """Delete a todo item.""" try: with get_connection() as conn: with conn.cursor() as cur: schema = get_schema_name() cur.execute(sql.SQL("DELETE FROM {}.todos WHERE id = %s").format(sql.Identifier(schema)), (todo_id,)) conn.commit() return True except Exception as e: print(f"Delete todo error: {e}") return False # Initialize Flask app app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', 'dev-secret-key') # Initialize database if not init_database(): print("Failed to initialize database") @app.route('/') def index(): """Main page showing all todos.""" todos = get_todos() return render_template('index.html', todos=todos) @app.route('/add', methods=['POST']) def add_todo_route(): """Add a new todo item.""" task = request.form.get('task', '').strip() if task: if add_todo(task): flash('Todo added successfully!', 'success') else: flash('Failed to add todo.', 'error') else: flash('Please enter a task.', 'error') return redirect(url_for('index')) @app.route('/toggle/<int:todo_id>') def toggle_todo_route(todo_id): """Toggle the completed status of a todo item.""" if toggle_todo(todo_id): flash('Todo updated successfully!', 'success') else: flash('Failed to update todo.', 'error') return redirect(url_for('index')) @app.route('/delete/<int:todo_id>') def delete_todo_route(todo_id): """Delete a todo item.""" if delete_todo(todo_id): flash('Todo deleted successfully!', 'success') else: flash('Failed to delete todo.', 'error') return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=int(os.getenv('PORT', 8080)))
Todo Listを管理するアプリのようです。 Appsを起動します。
何回かタスクの追加や完了、削除などを試したので、これらの操作がLakebaseにトランザクションとして記録されているはずです。 LakebaseからNew Queryをクリックすると、SQL Editorが見えます。
以下のSQLを実行し、DBの中身を確認します。
select * from "databricks_postgres"."lakebase-postgres-app_schema_1ad8d6f950d64d4e9ba77105a8d1431d"."todos"
以下、実行結果です。
To list管理の操作がトランザクションとして正常に出力されています。
おまけ
Lakebaseの重要な機能ではあるのですが、ブログの都合上簡単な説明に留めます。
Unity Catalogに登録
LakebaseのデータベースをUnity Catalogに登録し、データの同期とガバナンスの強化も魅力です。
LakebaseのCatalogsからCreate Catalogボタンから、カタログを作成し、同期されていることも確認しました。
Lakebaseのモニタリング
Metricsタブから、Lakebaseの使用状況についてTransactions per secondなどのメトリックを観察できます。
おわりに
本記事では、DatabricksのLakebaseとDatabricks Appsの連携によるトランザクションデータ管理の最新事例を紹介しました。LakebaseはLakehouseアーキテクチャの思想を継承し、クラウドストレージ上で高性能なトランザクション処理と柔軟なデータ活用を実現します。
Databricks Appsと組み合わせることで、アプリケーションから直接Lakebaseにデータを出力・管理でき、リアルタイムなデータ処理やエンタープライズ用途にも対応可能です。セットアップや権限設定、アプリの実装例を通じて、Lakebaseの実践的な活用イメージを掴んでいただけたかと思います。
今回は簡単にLakebaseとAppsとの連携が容易であることを伝えるため、簡単な操作と説明でした。時間があれば別のアプリケーションでLakebaseのブランチ切り替えな機能などを試してみたいと思います。
私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援等()幅広く支援をしております。もしご興味がある方は、お問い合わせ頂ければ幸いです。
また、一緒に働いていただける仲間も募集中です! エーピーコミュニケーションズやLakehouse部にご興味がある方は問い合わせいただければ幸いです。