はじめに
こんにちは、ACS事業部 CIチームの平井です。 今回は、AzureのBlob storage triggerをEvent Grid経由でトリガーし、コンテナーに新しいファイルが追加されるとSlackに自動通知するアプリをご紹介します。
通知システムを構築するためのヒントとして、参考にしていただけたらと思います。
なお、本記事は公式ドキュメントの手順をベースに構築したものになります。 learn.microsoft.com
また同じチームの青木さんが、Azure Policyに基づく準拠状況のSlack自動通知アプリを作成した記事を書いていますので、こちらもぜひご覧ください。
Event Gridを経由する理由
Blob storage triggerはポーリングベースで動作し、新しいBlobがないかアプリ側から定期的にスキャンします。しかし、コンテナー内のファイル数が増えるとスキャンに時間がかかり、データ保存から関数実行までに最大10分程度の遅延が発生することがあります。
この問題を解決するために、イベントベースのEvent Gridを利用します。Blobの変更が発生すると即座に通知が送信されるため、ポーリングによる遅延を回避し、ほぼリアルタイムで関数をトリガーできます。
構成図
今回作成するアプリの処理のイメージ図です。
必要なツール類
この記事を執筆する際に使用したツール類とそのバージョンは以下の通りです。
- Visual Studio Code: 1.92.2
- WSL: 2.2.4.0
- Python: 3.10.12
- Azure Storage Explorer
- Azure CLI: 2.62.0
- Azure Functions Core Tools: 4.0.5907
事前準備
VS Codeの拡張機能のインストール
Azure Functions拡張機能をインストールします。こちらはアプリの作成に利用します。
関数アプリ用の空ディレクトリの作成
アプリ用に任意の名前でディレクトリを作成します。
Slack Webhook URLの取得
以下のドキュメントなどを参考に、SlackのWebhook URLを取得します。
アプリの作成
これからアプリを作成してきます。こちらで作成するアプリはGit Hubリポジトリに載っていますので参考にしていただけたらと思います。
開発環境の作成
VS Codeのコマンドパレット(「F1」または「CTRL」+「SHIFT」+「P」)を開き、Azure Functions: Create Function...
を選択します。
先ほど作成したディレクトリを指定後、開発言語としてPythonを選択します。
モデルは推奨のV2を選択します。
PythonがWSLにインストール済みであれば、インストールされているPythonのバージョンが表示されるのでそのまま選択します。
テンプレートとしてBlob storage triggerを選択します。
関数名、ストレージアカウント内のパスはデフォルトのまま進めます。
最後に監視するストレージアカウントを選択します。任意のサブスクリプションを選択後、ローカルのエミュレーターを選択します。
function_app.pyの編集
アプリの処理で行いたい処理を記載していきます。
import os import json import logging import requests import azure.functions as func from azure.identity import DefaultAzureCredential from azure.keyvault.secrets import SecretClient app = func.FunctionApp() @app.blob_trigger(arg_name="myblob", path="mycontainer/{name}", connection="BLOB_STORAGE_ACCOUNT", source="EventGrid") def blob_trigger(myblob: func.InputStream): content = myblob.read().decode("UTF-8") logging.info(f"Python Blob storage trigger function processed blob\nName: {myblob.name}\nContent: {content}") webhook_uri = get_webhook_uri() send_notification_to_slack(webhook_uri, myblob.name, content) def get_webhook_uri(): key_vault_name = os.getenv("KEY_VAULT_NAME") key_vault_uri = f"https://{key_vault_name}.vault.azure.net/" credentials = DefaultAzureCredential() secret_name = os.getenv("WEBHOOK_SECRET_NAME") secret_client = SecretClient(vault_url=key_vault_uri, credential=credentials) webhook_uri = secret_client.get_secret(secret_name) return webhook_uri.value def send_notification_to_slack(webhook_uri, name, content): slack_message = { "text": f"データの更新がありました\n Name: {name} \n Content: {content}" } json_message = json.dumps(slack_message) headers = {'Content-Type': 'application/json'} response = requests.post(webhook_uri, data=json_message, headers=headers) # Slack通知の送信結果をログに出力 if response.status_code == 200: logging.info("Successed to send notification to Slack.") else: logging.error(f"Failed to send notification to Slack. Response: {response.text}")
コードの解説
@app.blob_trigger
概要: このデコレーターは、関数がAzure Blob Storageの特定のコンテナー内のBlobの変更によってトリガーされることを指定します。
パラメーター:
arg_name="myblob"
: トリガーされたBlobを関数に渡す際の引数名としてblobを指定します。path="mycontainer/{name}"
: トリガー対象のBlobのパスを指定します。ここでは、mycontainerというコンテナー内が対象となります。connection="BLOB_STORAGE_ACCOUNT"
: Azure Blob Storageアカウントへの接続文字列を指定します。この接続文字列は、環境変数から取得されます。source="EventGrid"
: イベントのソースを指定します。ここでは、Event Gridを使用してBlobのイベントを監視します。
blob_trigger関数
概要: コンテナーに新しいBlobが追加されたときにトリガーされるメインの関数
パラメーター:
myblob
: トリガーされたBlob。
処理:
- Blobの内容を読み取り、その名前と内容をログに記録します。
get_webhook_uri()
を呼び出してSlackのWebhook URIを取得します。send_notification_to_slack()
を呼び出して、Blobの名前と内容を含む通知をSlackに送信します。
get_webhook_uri関数
概要: Azure Key VaultからSlackのWebhook URIを取得するの関数
処理:
- 環境変数からKey Vaultの名前とシークレット名を取得します。
DefaultAzureCredential()
を使用して認証し、Key Vaultにアクセスします。- Key VaultからWebhook URIのシークレットを取得し、その値を返します。
send_notification_to_slack
概要: コンテナに新しいBlobが追加されたときにトリガーされるメインの関数
パラメーター:
webhook_uri
: SlackのWebhook URLname
: Blobの名前content
: Blobの内容
処理:
- Blobの名前と内容を含むSlackメッセージを作成します。
- Webhook URLにHTTP POSTリクエストを使用してSlackにメッセージを送信します。
- Slack通知の結果をログに記録します。
requirements.txtの編集
function_app.pyに記載した処理の実行に必要なパッケージを記載していきます。
# DO NOT include azure-functions-worker in this file # The Python Worker is managed by Azure Functions platform # Manually managing azure-functions-worker may cause unexpected issues azure-functions azure-identity azure-keyvault-secrets requests
Azure環境にデプロイ
リソースの作成
作業ディレクリで以下のスクリプトを作成し、実行します。リポジトリにあるfunction.sh
を利用して頂いても大丈夫です。
"<>"のところは環境に合わせて置き換えてください。
#!/bin/bash # 変数の設定 SUBSCRIPTION="<Subcription ID>" RESOURCE_GROUP="<リソースグループ名>" LOCATION="<リージョン名>" FUNCTION_APP="<関数アプリ名>" FUNCTION_STORAGE_ACCOUNT="<関数用ストレージアカウント名>" BLOB_STORAGE_ACCOUNT="<Blob用ストレージアカウント名>" KEY_VAULT_NAME="<Key Vault キーコンテナ名>" WEBHOOK_SECRET_NAME="<シークレット名>" WEBHOOK_URI="<Slack Webhook URL>" EVENT_SUBSCRIPTION_NAME="<イベントサブスクリプション名>" # 作業するサブスクリプションの指定 az account set --subscription ${SUBSCRIPTION} # Resource Groupの作成 az group create --name ${RESOURCE_GROUP} --location ${LOCATION} # Functions用ストレージアカウントの作成 az storage account create \ --name ${FUNCTION_STORAGE_ACCOUNT} \ --resource-group ${RESOURCE_GROUP} \ --location ${LOCATION} \ --sku Standard_LRS # Blob用ストレージアカウントの作成 az storage account create \ --name ${BLOB_STORAGE_ACCOUNT} \ --resource-group ${RESOURCE_GROUP} \ --location ${LOCATION} \ --sku Standard_LRS # Blobコンテナの作成 az storage container create \ --name mycontainer \ --account-name ${BLOB_STORAGE_ACCOUNT} \ --auth-mode login # Webhook用Key Vaultの作成 az keyvault create \ --name ${KEY_VAULT_NAME} \ --resource-group ${RESOURCE_GROUP} \ --location ${LOCATION} # Key Vault管理者ロールをログインユーザーに付与 USER_ID=$(az ad signed-in-user show --query id --output tsv) az role assignment create \ --assignee ${USER_ID} \ --role "Key Vault Administrator" \ --scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.KeyVault/vaults/${KEY_VAULT_NAME} echo "Waiting for the managed role to be fully registered..." sleep 30 # Key VaultにWebhookのURIを登録 az keyvault secret set \ --vault-name ${KEY_VAULT_NAME} \ --name ${WEBHOOK_SECRET_NAME} \ --value ${WEBHOOK_URI} # Azure Functions Appの作成 az functionapp create \ --resource-group ${RESOURCE_GROUP} \ --consumption-plan-location ${LOCATION} \ --runtime python \ --runtime-version 3.10 \ --functions-version 4 \ --name ${FUNCTION_APP} \ --storage-account ${FUNCTION_STORAGE_ACCOUNT} \ --os-type Linux # 環境変数の設定 az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings KEY_VAULT_NAME="${KEY_VAULT_NAME}" az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings WEBHOOK_SECRET_NAME="${WEBHOOK_SECRET_NAME}" az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings BLOB_STORAGE_ACCOUNT__blobServiceUri=https://${BLOB_STORAGE_ACCOUNT}.blob.core.windows.net az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings BLOB_STORAGE_ACCOUNT__queueServiceUri=https://${BLOB_STORAGE_ACCOUNT}.queue.core.windows.net # Managed IDの付与 az functionapp identity assign --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} echo "Waiting for the managed identity to be fully registered..." sleep 30 # プリンシパルIDの取得 PRINCIPAL_ID=$(az functionapp identity show --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --query principalId --output tsv) # ストレージ BLOB データ所有者ロールを Azure Functions AppのマネージドIDにアサイン az role assignment create \ --assignee ${PRINCIPAL_ID} \ --role "Storage Blob Data Owner" \ --scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT} # ストレージ キュー データ共同作成者ロールを Azure Functions AppのマネージドIDにアサイン az role assignment create \ --assignee ${PRINCIPAL_ID} \ --role "Storage Queue Data Contributor" \ --scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT} # KeyVault用ロールをAzure Functions AppのマネージドIDにアサイン az role assignment create \ --assignee ${PRINCIPAL_ID} \ --role "Key Vault Secrets User" \ --scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.KeyVault/vaults/${KEY_VAULT_NAME} # Functionsにアプリをデプロイ func azure functionapp publish ${FUNCTION_APP} # イベントを報告するエンドポイントのURLを作成 BLOB_EXTENSION_KEY=$(az functionapp keys list --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --query "systemKeys.blobs_extension" --output tsv) EVENT_GRID_ENDPOINT="https://${FUNCTION_APP}.azurewebsites.net/runtime/webhooks/blobs?functionName=Host.Functions.blob_trigger&code=${BLOB_EXTENSION_KEY}" # Event Gridを利用したイベントサブスクリプションを作成 az eventgrid event-subscription create \ --name ${EVENT_SUBSCRIPTION_NAME} \ --source-resource-id /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT} \ --endpoint ${EVENT_GRID_ENDPOINT} \ --event-delivery-schema EventGridSchema \ --included-event-types Microsoft.Storage.BlobCreated
動作確認
Slackに通知が来ることが確認します。
Azure Storage Explorerでテキストファイルをアップロードします。
アップロード後、Slackに以下のようなメッセージが来るはずです。
次にAzure Portalでログを確認してみましょう。
[関数アプリ]→[Azure Functionsのアプリ名]→[関数名]→[呼び出し]タブとクリックすることで、Azure Functionsでデプロイされた関数の呼び出しが確認できます。
Event Gridを起点としてアプリが動いていることがわかります。
おわりに
この記事では、Blobの変更をSlackに自動通知する方法を紹介しました。この手法を応用して、特定のファイルが更新された際に自動でバックアップを取るなど、さらなる自動化を実現することも可能です。この記事が皆さんの学びや実践に少しでも役立てば嬉しいです。
私達ACS事業部はAzure・AKSなどのクラウドネイティブ技術を活用した内製化のご支援をしております。
また、一緒に働いていただける仲間も募集中です!
今年もまだまだ組織規模拡大中なので、ご興味持っていただけましたらぜひお声がけください。