APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

Azure Functions (Durable Function[Python])

はじめに

こんにちは。ACS 事業部の奥山です。

Azure Functions の Durable Functions (Python) についての調査・検証を行ったので、備忘録を兼ねてブログにしておきます。

現在、担当しているシステムで時間のかかる処理を行う必要があり、調べた内容です。 様々な実現方法があるとは思いますが、Azure なら Durable Functions がオススメです。

Durable Functions (Azure Functions) とは

Microsoftが開発した Durable Taskフレームワーク を基に構築された Azure Functionsの拡張機能になります。ステートフルな処理 や 長期実行プロセス を簡単に作成できます。 Microsoft内でも様々なところで利用されています。

(参考) 以前に書いたブログAzure Durable Functionsを使ってみた

Pythonでの実装について

Azure Functions Pythonでのプログラミングには プログラミング モデル v1 と v2 があります。 v1 と v2 の最も大きな違いは functions.json を利用するかどうかです。 v2 では functions.json がなくなりデコレーターでバインディング等の設定を指定することになり、コード中心になります。

※ 今回は v2 で検証を進めました。実際によく利用しそうなものを以下に紹介します。

(1) Blueprints (フォルダ構成を変更)

推奨フォルダー構造を参考に、ブループリントを利用して機能単位にフォルダを分けます。

$ tree 
.
├── blog.md
├── func01 ※一つ目の機能
├── func02 ※二つ目の機能
├── function_app.py
├── host.json
├── local.settings.json
└── requirements.txt

(2) シングルトン オーケストレーター

こちらで紹介されている特定のオーケストレーターを1度に一つだけ実行されるように保証するパターン。 オーケストレーターのID(インスタンスID)を固定しておいて実行中かどうかを確認します。実装はシンプルでわかりやすいですね。

# An HTTP-Triggered Function with a Durable Functions Client binding
@bp.route(route="orchestrators2/{functionName}")
@bp.durable_client_input(client_name="client")
async def http_start2(req: func.HttpRequest, client):

    # シングルトン オーケストレーター
    instance_id = req.params.get('myId', 'my-id-2024001')
    log_thread_info(f"http_start2: instance_id {instance_id}")
    existing_instance = await client.get_status(instance_id)
    if existing_instance.runtime_status in [df.OrchestrationRuntimeStatus.Completed, df.OrchestrationRuntimeStatus.Failed, df.OrchestrationRuntimeStatus.Terminated, None]:
        taskCount = int(req.params.get('taskCount', '100'))
        function_name = req.route_params.get('functionName')

        log_thread_info(f"http_start2: taskCount {taskCount}")
        instance_id = await client.start_new(function_name, instance_id, taskCount)
        response = client.create_check_status_response(req, instance_id)
    else:
        # すでに実行中の場合は、そのまま返す
        response = client.create_check_status_response(req, instance_id)
    return response

(3) アプリケーション パターン #3: 非同期 HTTP API

時間のかかる処理に有効なのが アプリケーション パターン #3: 非同期 HTTP API です。

実装自体は 通常の durable functions と同様です。 何もしなくてもオーケストレーター関数の状態をクエリするWebhook HTTP API(組み込み処理)が利用できます。※赤枠のところ

状態をクエリする 組み込みの Webhook HTTP API

インスタンスの管理 を参照

Webhook HTTP API の URL はHTTP-Triggered関数の場合、RESTのResponseに含まれています。

response = client.create_check_status_response(req, instance_id)

レスポンスを確認すると以下のように URL が確認できます。

$ curl -sS http://localhost:7071/api/orchestrators/hello_orchestrator   | jq . 
{
  "id": "59258d16b93544338469fd8d954b5e9d",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>",
  "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/raiseEvent/{eventName}",
  "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/terminate",
  "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/rewind",
  "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>",
  "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/restart",
  "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/suspend",
  "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<id>/resume"
}

Runtime Status

Client はポーリングによって操作が完了したことを認識することができます。 ClientはAPIを通してオーケストレーションの状態を知ることができます。

RuntimeStatus 意味
Pending スケジュール済み
Running 実行中
Completed 完了 
ContinuedAsNew インスタンスが新しい履歴で自身を再開しました。 これは一時的な状態です。
Failed 失敗
Terminated 停止
Suspended 再開(resume)待ち

(4) スケーリングとパフォーマンスの調整

プログラミング言語の特性や実際の処理の特性に応じて以下のパラメータを調整します。

パラメータ 説明 備考(変更方法)
インスタンス数 host machineの数 ※スケールアウト
プロセス数 FUNCTIONS_WORKER_PROCESS_COUNT (default: 1) 環境変数
スレッド数 PYTHON_THREADPOOL_THREAD_COUNT (default: None ※実行中に設定されるスレッドの数を保証しない) 環境変数
並列数(Activity) maxConcurrentActivityFunctions host.json (extensions.durableTask) ※1つのワーカーが処理する数を設定
並列数(Orchestrator) maxConcurrentOrchestratorFunctions host.json (extensions.durableTask) ※1つのワーカーが処理する数を設定

※上記パラメータを実際に動作させながら調整していくことになります。

(参考) Azure Functions で Python アプリのスループット パフォーマンスを向上させる

並列処理の確認

Activityで1秒のSleep処理を実施し並列数を上げることで全体の処理時間が短縮されることを確認します。
※Activityはファンアウト・ファンインのシナリオで並列で処理されるように実装しています。

簡単ですが「インスタンス数、 並列数、 プロセス数」を変更して動作確認した結果は以下の様になりました。

※重量課金プランで実施
※並列数 (maxConcurrentActivityFunctions と maxConcurrentOrchestratorFunctions) は同じ値を設定
※ スレッド数 (PYTHON_THREADPOOL_THREAD_COUNT) は 1固定

インスタンス数 並列数 プロセス数 処理時間
1 1 1 約100秒
1 2 2 約50秒
4 2 2 約30秒

※処理時間の確認は Azure Storage の table で確認しています。

パラメータの変更で並列に処理されて全体の処理時間が短くなっていることを確認できました。

まとめ

今回は Azure Functions (Durable Functions) の
・Pythonでの実装
・非同期 HTTP API
・スケーリングとパフォーマンスの調整
について検証を行いました。「非同期 HTTP API 」、「スケーリングとパフォーマンスの調整」については仕組みとして用意されているので是非活用していきたいです。

最後に

私達 ACS 事業部は Azure・AKS を活用した内製化のご支援をしております。ご相談等ありましたらぜひご連絡ください。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
切磋琢磨しながらスキルを向上できる、エンジニアには良い環境だと思います。ご興味を持っていただけたら嬉しく思います。

www.ap-com.co.jp

本記事の投稿者: 奥山 拓弥