APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

Event Grid経由でのBlob storage triggerでBlobの変更をSlackに自動通知する

はじめに

こんにちは、ACS事業部 CIチームの平井です。 今回は、AzureのBlob storage triggerをEvent Grid経由でトリガーし、コンテナーに新しいファイルが追加されるとSlackに自動通知するアプリをご紹介します。

通知システムを構築するためのヒントとして、参考にしていただけたらと思います。

なお、本記事は公式ドキュメントの手順をベースに構築したものになります。 learn.microsoft.com

learn.microsoft.com

また同じチームの青木さんが、Azure Policyに基づく準拠状況のSlack自動通知アプリを作成した記事を書いていますので、こちらもぜひご覧ください。

techblog.ap-com.co.jp

Event Gridを経由する理由

Blob storage triggerはポーリングベースで動作し、新しいBlobがないかアプリ側から定期的にスキャンします。しかし、コンテナー内のファイル数が増えるとスキャンに時間がかかり、データ保存から関数実行までに最大10分程度の遅延が発生することがあります。

この問題を解決するために、イベントベースのEvent Gridを利用します。Blobの変更が発生すると即座に通知が送信されるため、ポーリングによる遅延を回避し、ほぼリアルタイムで関数をトリガーできます。

learn.microsoft.com

構成図

今回作成するアプリの処理のイメージ図です。

必要なツール類

この記事を執筆する際に使用したツール類とそのバージョンは以下の通りです。

  • Visual Studio Code: 1.92.2
  • WSL: 2.2.4.0
  • Python: 3.10.12
  • Azure Storage Explorer
  • Azure CLI: 2.62.0
  • Azure Functions Core Tools: 4.0.5907

事前準備

VS Codeの拡張機能のインストール

Azure Functions拡張機能をインストールします。こちらはアプリの作成に利用します。

marketplace.visualstudio.com

関数アプリ用の空ディレクトリの作成

アプリ用に任意の名前でディレクトリを作成します。

Slack Webhook URLの取得

以下のドキュメントなどを参考に、SlackのWebhook URLを取得します。

api.slack.com

アプリの作成

これからアプリを作成してきます。こちらで作成するアプリはGit Hubリポジトリに載っていますので参考にしていただけたらと思います。

github.com

開発環境の作成

VS Codeのコマンドパレット(「F1」または「CTRL」+「SHIFT」+「P」)を開き、Azure Functions: Create Function... を選択します。

先ほど作成したディレクトリを指定後、開発言語としてPythonを選択します。

モデルは推奨のV2を選択します。

PythonがWSLにインストール済みであれば、インストールされているPythonのバージョンが表示されるのでそのまま選択します。

テンプレートとしてBlob storage triggerを選択します。

関数名、ストレージアカウント内のパスはデフォルトのまま進めます。

最後に監視するストレージアカウントを選択します。任意のサブスクリプションを選択後、ローカルのエミュレーターを選択します。

function_app.pyの編集

アプリの処理で行いたい処理を記載していきます。

import os
import json
import logging
import requests
import azure.functions as func
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

app = func.FunctionApp()

@app.blob_trigger(arg_name="myblob", path="mycontainer/{name}", connection="BLOB_STORAGE_ACCOUNT", source="EventGrid")

def blob_trigger(myblob: func.InputStream):
    content = myblob.read().decode("UTF-8")
    logging.info(f"Python Blob storage trigger function processed blob\nName: {myblob.name}\nContent: {content}")

    webhook_uri = get_webhook_uri()
    send_notification_to_slack(webhook_uri, myblob.name, content)

def get_webhook_uri():
    key_vault_name = os.getenv("KEY_VAULT_NAME")
    key_vault_uri = f"https://{key_vault_name}.vault.azure.net/"
    credentials = DefaultAzureCredential()
    secret_name = os.getenv("WEBHOOK_SECRET_NAME")
    secret_client = SecretClient(vault_url=key_vault_uri, credential=credentials)

    webhook_uri = secret_client.get_secret(secret_name)
    return webhook_uri.value

def send_notification_to_slack(webhook_uri, name, content):
    slack_message = {
        "text": f"データの更新がありました\n Name: {name} \n Content: {content}"
    }

    json_message = json.dumps(slack_message)
    headers = {'Content-Type': 'application/json'}
    response = requests.post(webhook_uri, data=json_message, headers=headers)

    # Slack通知の送信結果をログに出力
    if response.status_code == 200:
        logging.info("Successed to send notification to Slack.")
    else:
        logging.error(f"Failed to send notification to Slack. Response: {response.text}")

コードの解説

  1. @app.blob_trigger

    • 概要: このデコレーターは、関数がAzure Blob Storageの特定のコンテナー内のBlobの変更によってトリガーされることを指定します。

    • パラメーター:

      • arg_name="myblob": トリガーされたBlobを関数に渡す際の引数名としてblobを指定します。
      • path="mycontainer/{name}": トリガー対象のBlobのパスを指定します。ここでは、mycontainerというコンテナー内が対象となります。
      • connection="BLOB_STORAGE_ACCOUNT": Azure Blob Storageアカウントへの接続文字列を指定します。この接続文字列は、環境変数から取得されます。
      • source="EventGrid": イベントのソースを指定します。ここでは、Event Gridを使用してBlobのイベントを監視します。
  2. blob_trigger関数

    • 概要: コンテナーに新しいBlobが追加されたときにトリガーされるメインの関数

    • パラメーター:

      • myblob: トリガーされたBlob。
    • 処理:

      • Blobの内容を読み取り、その名前と内容をログに記録します。
      • get_webhook_uri()を呼び出してSlackのWebhook URIを取得します。
      • send_notification_to_slack()を呼び出して、Blobの名前と内容を含む通知をSlackに送信します。
  3. get_webhook_uri関数

    • 概要: Azure Key VaultからSlackのWebhook URIを取得するの関数

    • 処理:

      • 環境変数からKey Vaultの名前とシークレット名を取得します。
      • DefaultAzureCredential()を使用して認証し、Key Vaultにアクセスします。
      • Key VaultからWebhook URIのシークレットを取得し、その値を返します。
  4. send_notification_to_slack

    • 概要: コンテナに新しいBlobが追加されたときにトリガーされるメインの関数

    • パラメーター:

      • webhook_uri: SlackのWebhook URL
      • name: Blobの名前
      • content: Blobの内容
    • 処理:

      • Blobの名前と内容を含むSlackメッセージを作成します。
      • Webhook URLにHTTP POSTリクエストを使用してSlackにメッセージを送信します。
      • Slack通知の結果をログに記録します。

requirements.txtの編集

function_app.pyに記載した処理の実行に必要なパッケージを記載していきます。

# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues

azure-functions
azure-identity
azure-keyvault-secrets
requests

Azure環境にデプロイ

リソースの作成

作業ディレクリで以下のスクリプトを作成し、実行します。リポジトリにあるfunction.shを利用して頂いても大丈夫です。

"<>"のところは環境に合わせて置き換えてください。

#!/bin/bash

# 変数の設定
SUBSCRIPTION="<Subcription ID>"
RESOURCE_GROUP="<リソースグループ名>"
LOCATION="<リージョン名>"
FUNCTION_APP="<関数アプリ名>"
FUNCTION_STORAGE_ACCOUNT="<関数用ストレージアカウント名>"
BLOB_STORAGE_ACCOUNT="<Blob用ストレージアカウント名>"
KEY_VAULT_NAME="<Key Vault キーコンテナ名>"
WEBHOOK_SECRET_NAME="<シークレット名>"
WEBHOOK_URI="<Slack Webhook URL>"
EVENT_SUBSCRIPTION_NAME="<イベントサブスクリプション名>"

# 作業するサブスクリプションの指定
az account set --subscription ${SUBSCRIPTION}

# Resource Groupの作成
az group create --name ${RESOURCE_GROUP} --location ${LOCATION}

# Functions用ストレージアカウントの作成
az storage account create \
--name ${FUNCTION_STORAGE_ACCOUNT} \
--resource-group ${RESOURCE_GROUP} \
--location ${LOCATION} \
--sku Standard_LRS

# Blob用ストレージアカウントの作成
az storage account create \
--name ${BLOB_STORAGE_ACCOUNT} \
--resource-group ${RESOURCE_GROUP} \
--location ${LOCATION} \
--sku Standard_LRS

# Blobコンテナの作成
az storage container create \
--name mycontainer \
--account-name ${BLOB_STORAGE_ACCOUNT} \
--auth-mode login

# Webhook用Key Vaultの作成
az keyvault create \
--name ${KEY_VAULT_NAME} \
--resource-group ${RESOURCE_GROUP} \
--location ${LOCATION}

# Key Vault管理者ロールをログインユーザーに付与
USER_ID=$(az ad signed-in-user show --query id --output tsv)

az role assignment create \
--assignee ${USER_ID} \
--role "Key Vault Administrator" \
--scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.KeyVault/vaults/${KEY_VAULT_NAME}

echo "Waiting for the managed role to be fully registered..."
sleep 30

# Key VaultにWebhookのURIを登録
az keyvault secret set \
--vault-name ${KEY_VAULT_NAME} \
--name ${WEBHOOK_SECRET_NAME} \
--value ${WEBHOOK_URI}

# Azure Functions Appの作成
az functionapp create \
--resource-group ${RESOURCE_GROUP} \
--consumption-plan-location ${LOCATION} \
--runtime python \
--runtime-version 3.10 \
--functions-version 4 \
--name ${FUNCTION_APP} \
--storage-account ${FUNCTION_STORAGE_ACCOUNT} \
--os-type Linux

# 環境変数の設定
az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings KEY_VAULT_NAME="${KEY_VAULT_NAME}"
az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings WEBHOOK_SECRET_NAME="${WEBHOOK_SECRET_NAME}"
az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings BLOB_STORAGE_ACCOUNT__blobServiceUri=https://${BLOB_STORAGE_ACCOUNT}.blob.core.windows.net
az functionapp config appsettings set --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --settings BLOB_STORAGE_ACCOUNT__queueServiceUri=https://${BLOB_STORAGE_ACCOUNT}.queue.core.windows.net

# Managed IDの付与
az functionapp identity assign --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP}

echo "Waiting for the managed identity to be fully registered..."
sleep 30

# プリンシパルIDの取得
PRINCIPAL_ID=$(az functionapp identity show --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --query principalId --output tsv)

# ストレージ BLOB データ所有者ロールを Azure Functions AppのマネージドIDにアサイン
az role assignment create \
--assignee ${PRINCIPAL_ID} \
--role "Storage Blob Data Owner" \
--scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT}

# ストレージ キュー データ共同作成者ロールを Azure Functions AppのマネージドIDにアサイン
az role assignment create \
--assignee ${PRINCIPAL_ID} \
--role "Storage Queue Data Contributor" \
--scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT}

# KeyVault用ロールをAzure Functions AppのマネージドIDにアサイン
az role assignment create \
--assignee ${PRINCIPAL_ID} \
--role "Key Vault Secrets User" \
--scope /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.KeyVault/vaults/${KEY_VAULT_NAME}

# Functionsにアプリをデプロイ
func azure functionapp publish ${FUNCTION_APP}

# イベントを報告するエンドポイントのURLを作成
BLOB_EXTENSION_KEY=$(az functionapp keys list --name ${FUNCTION_APP} --resource-group ${RESOURCE_GROUP} --query "systemKeys.blobs_extension" --output tsv)

EVENT_GRID_ENDPOINT="https://${FUNCTION_APP}.azurewebsites.net/runtime/webhooks/blobs?functionName=Host.Functions.blob_trigger&code=${BLOB_EXTENSION_KEY}"

# Event Gridを利用したイベントサブスクリプションを作成
az eventgrid event-subscription create \
--name ${EVENT_SUBSCRIPTION_NAME} \
--source-resource-id /subscriptions/${SUBSCRIPTION}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${BLOB_STORAGE_ACCOUNT} \
--endpoint ${EVENT_GRID_ENDPOINT} \
--event-delivery-schema EventGridSchema \
--included-event-types Microsoft.Storage.BlobCreated

動作確認

Slackに通知が来ることが確認します。

Azure Storage Explorerでテキストファイルをアップロードします。

アップロード後、Slackに以下のようなメッセージが来るはずです。

次にAzure Portalでログを確認してみましょう。

[関数アプリ]→[Azure Functionsのアプリ名]→[関数名]→[呼び出し]タブとクリックすることで、Azure Functionsでデプロイされた関数の呼び出しが確認できます。

Event Gridを起点としてアプリが動いていることがわかります。

おわりに

この記事では、Blobの変更をSlackに自動通知する方法を紹介しました。この手法を応用して、特定のファイルが更新された際に自動でバックアップを取るなど、さらなる自動化を実現することも可能です。この記事が皆さんの学びや実践に少しでも役立てば嬉しいです。

私達ACS事業部はAzure・AKSなどのクラウドネイティブ技術を活用した内製化のご支援をしております。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
今年もまだまだ組織規模拡大中なので、ご興味持っていただけましたらぜひお声がけください。

www.ap-com.co.jp

本記事の投稿者: <平井亨>