APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

AKSでDaprを使ってみる(4) Service Busと連携してPubSubする

はじめに

「AKSでDaprを使ってみる」シリーズ、これまでの3回 ()は Service Invocation機能を利用したものでした。Service Invocation機能だけだと(普通のREST APIと何が違うの?)と 思われたかもしれません。しかしDaprの実力は実はService Invocation以外のところにあると思います。今回はそんな Daprの本領を発揮する部分の1つを動かしてみたいと思います。それは「Pub/Sub message broker」です。

docs.dapr.io

PubSub message borkerは、DaprがPub/Sub Componentの接続を抽象化してくれる機能です。

今回はPub/Sub ComponentとしてAzure Service BusのTopicを利用し、実際にmessage brokerを使ってみたいと思います。
実現するイメージは以下のようなものです。

システム構成イメージ

構築と実装

Azure Service Busの環境構築

Service Bus名前空間の作成

まず最初にAzure Service Busの名前空間( Namespace)を作成します。 Daprのデフォルト設定では必要となるTopicやSubscriptionはDaprが自動的に作成します。このため、ここで必要なのはService Busの名前空間だけです。

ロールの割り当て

続いてDaprからアクセスするためのロールの割り当てです。。 今回はAKS Node poolのManaged IdにService Busの管理権限を付与します。これはTopicやSubscriptionの作成を自動的に行うためです。

Service Bus 名前空間の画面でアクセス制御(IAM)を選択肢ます。

「+追加」ボタンでロールの割り当ての追加画面に遷移し、ロールタブでは「Azure Service Busのデータ所有者」を選択します。

メンバータブではアクセスの割り当て先に「マネージドID」を選択し、「+メンバーを選択する」から選択ダイアログを表示します(画面右側)。 ダイアログでは「ユーザー割り当てマネージドID」を選択肢、表示される一覧の中から 「<aks-cluster-name>-agentpool」 という名称のものを探し選択します。

Dapr PubSub Component設定

最後にDaprのPubSub Componentの設定を行います。

公式ドキュメントに紹介されていうちのひとつ、AAD Authenticationを利用します。 今回はManaged Idを利用しますので、ClientSecretなどの設定は不要となります

なお、今回のシステムでは、Publisher(app-id: aggregator)がAKS namespace の web 、 Subscriber(app-id: event-consumer)がAKS namespaceの event にあり、上記で作成したPubSubを利用します。

このように複数のAKS namespaceにまたがって利用する場合、Dapr Pub/Sub componentは、 AKS namespaceごとに設定が必要となります。よって設定の内容は以下のようになります。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-pubsub
  namespace: event
spec:
  type: pubsub.azure.servicebus
  version: v1
  metadata:
  - name: namespaceName
    value: "${AZURE_SERVICEBUS_NAMESPACE}"
  - name: azureClientId
    value: "${PRINCIPAL_APPLICATION_ID}"
scopes:
- event-consumer
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-pubsub
  namespace: web
spec:
  type: pubsub.azure.servicebus
  version: v1
  metadata:
  - name: namespaceName
    value: "${AZURE_SERVICEBUS_NAMESPACE}"
  - name: azureClientId
    value: "${PRINCIPAL_APPLICATION_ID}"
scopes:
- aggregator

なお、 AZURE_SERVICEBUS_NAMESPACE は上記で作成したAzure Service Busの名前空間の名称になります。 PRINCIPAL_APPLICATION_ID は<cluster-name>-noodpool のClient IDです。以下のコマンドで取得可能です。

az aks show --name $CLUSTER_NAME --resource-group $RESOURCE_GROUP | jq -r '.identityProfile.kubeletidentity.clientId'

これで準備は完了です。

Subscriber側の実装

つづいてアプリケーション側のほうを見ていきましょう。まずはSubscriberから。こちらはTypescrptで実装してみました。

コンフィグレーション

このあたりから Daprの最大の特徴にもなってきますが、アプリケーションのインターフェースはHTTPないしgRPCを利用します。 Azure Service Busを使ったPubSubであっても、実装するのはWebアプリケーションです。

まず最初にTopic指定とデータ受信のEndpointの指定をします(公式ドキュメントはこちら

import { Request, Response, Router } from 'express';

function subscribe(_req: Request, resp: Response) {
  resp.json([
    {
      pubsubName: 'sample-pubsub',
      topic: 'orders',
      route: '/consumers/simple',
    },
  ]);
}

// 
app.get('/dapr/subscribe', subscribe);

pubsubName には前で設定したPubSub componentの名称を、 topic はPubSub内のトピック名称、 route はAPIサーバー内のEndpointを指定します。Daprはこの設定を読み取って登録します。

なお、デフォルト動作ではAzure Service Bus内に自動的にトピック・サブスクリプションを作成します。 トピック名は上記で指定したもの、サブスクリプション名は dapr-app-id となります。

アプリケーション

つづいてEndpoint側の実装です。

import { Request, Response } from 'express';

function simpleOrder(req: Request, resp: Response) {
  // req.bodyはcloudevents形式
  const simpleOrder = req?.body?.data as SimpleOrderEvent;
  console.log(simpleOrder);
  resp.status(200).send();
}

export interface SimpleOrderEvent {
  orderName: string;
}

//
app.get('/consumers/simple', simpleOrder);

ご覧の通り普通のHTTP Endpointです。Subscribeしたデータはrequest bodyとして渡されます。 ここに注意点があります。Dapr PubSubではデータをCloudEvents形式で送受信しています。 このため受信データもCloudEventsの形式となっており、送信したデータそのものは request.body.data に格納されています。 ここだけ注意すればあとは通常のHTTP APIとして実装すればOKです。

Publisher側の実装

ここまで来たらあとはPublisherの実装です。こちらは今まで利用してきたaggregatorに追加する形で実装したいと思います。Spring Bootアプリケーションになります。

アプリケーション

controller

@RequiredArgsConstructor
@RestController
@RequestMapping("/publish")
public class EventPublisherController {
    private final OrderPublisherService publisherService;

    @RequestMapping(path = "simple", consumes = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.POST)
    public Mono<Void> publishSimpleOrder(@RequestBody AggregatorSimpleRequest request) {
        return publisherService.publishSimpleOrder(request.orderName);
    }
}

class AggregatorSimpleRequest {
    public String orderName;
}

service

@Service
@RequiredArgsConstructor
public class OrderPublisherService {
    private final BackendConfig backendConfig;
    private final @Qualifier("orderTopicWebClient") WebClient orderTopicWebClient;

    public Mono<Void> publishSimpleOrder(String name) {
        var path = backendConfig.getOrderTopicBase();
        var request = SimpleOrderRequest.builder().orderName(name).build();
        return orderTopicWebClient.post().uri(path)
        .accept(MediaType.APPLICATION_JSON)
        .body(BodyInserters.fromValue(request))
        .retrieve()
        .bodyToMono(Void.class);
    }
}

クライアントから要求を受け取り、それをDaprのPublish Endpointに送信します。 (公式ドキュメント

PublishのEndpointは公式ドキュメントにある通り

POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]

です。送信時はデータをCloudEvents形式にする必要はありません(Dapr側でWrapしてくれます)

これで完了。

実際の動作確認

それではcurlコマンドなどでaggregatorのにリクエストを投げてみましょう。 最終的に event-consumerのconsole.logに送信したデータが表示されると思います。(kubectl logsコマンドなどで確認してください)

今回Subscriberとしている event-consumer アプリケーションにも Application Insightsのライブラリを導入しています。 このため、Azure Service Busを挟んでaggregatorからevent-consumerのend to endでトレースを確認することもできます。 このあたりのDapr導入の効果かもしれません。(わかりやすくするため、event-consumerに応答遅延を入れています。)

まとめ

準備はあれこれしましたが、アプリケーション部分は通常のHTTP Webアプリケーションとして実装できるため、非常に簡単です。アプリケーション側にはAzure Service Bus用のライブラリなども必要ありません。 まさにこの点がDaprの特徴です。基本的にすべてのAPIがHTTP/gRPCで実現されているため、これまで慣れ親しんだ方法で実装ができるのです。 もちろんDapr SDKも提供されており、送信・受信部分などもっと簡便にすることも可能です。 しかし敢えて通常のHTTPとして利用・実装することでこれまで利用してきたテスト手法なども使えると思います。

すべてがHTTP/gRPCの使いなればインターフェースとなり、サービス固有のライブラリなどには依存しない。 Daprのこうした利点を活用して、もっとクラウドネイティブなシステムを簡単にしていきましょう。

いつものように検証で利用したソースコード、設定ファイルは以下のGitHub Repositoryで公開しています。

github.com