APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

【Zscaler】Zscaler OneAPIを利用してポリシーを作成してみた

目次

はじめに

こんにちは、エーピーコミュニケーションズ iTOC事業部BzD部0-WANの柳田です。
ZscalerではOneAPI(以下、API)経由でポリシーの作成や変更、削除を行うことが可能です。
今回はAPIを利用してZscaler Internet Access(以下、ZIA)の機能であるURLフィルタリングのポリシーを作成したので手順をご紹介します。

全体図

まず最初にAWS Secret Managerから必要な認証情報を取得し、API操作の処理を記述したPythonコードを実行します。
成功すればURLフィルタリングのポリシーが作成され、実行結果が返ってきます。

全体図
全体図

API操作を行うための準備

Zscalerでは以下のAPIへのアクセス方法を提供しています。
今回は「クラウドサービスAPI」の「基本認証とAPIキーの組み合わせ」を使用します。その場合には、以下の4つの情報が必要になります。

APIにアクセスするための認証方式
APIにアクセスするための認証方式

  • ログインID

  • パスワード

  • ベースURL

  • APIキー

API操作を行うにあたって、専用の管理者ユーザを作成して適切なロールを割り当てることが推奨されています。
また、注意点として ZIdentity対応のテナントではIdP経由でプロビジョニングされたユーザはAPIに対して認証を行うことができません。

APIにアクセスするユーザの注意点
APIにアクセスするユーザの注意点

ログインIDとパスワードの取得

上記を踏まえて新たにAPI用ユーザとロールを作成し、ログインIDとパスワードを取得します。
ユーザは「ZIdentity管理ポータル > Directory > Users」から作成します。

ユーザ作成画面
ユーザ作成画面

ロールは「ZIA管理ポータル > Administration > Role Management」から作成します。

ロールの作成画面
ロールの作成画面

今回は「URL & Cloud App Control」と「User Management」の権限を「Full」に設定し、その他の項目は「View Only」または「None」にします。

権限の設定箇所①
権限の設定箇所①
権限の設定箇所②
権限の設定箇所②

最後に「ZIdentity管理ポータル > Administration > Entitlements > Administrative > Zscaler Internet Access」からAPI操作を行うユーザに、作成したロールを割り当てれば完了です。

ロールの割り当て
ロールの割り当て

ベースURLとAPIキーの取得

ベースURLとAPIキーは「ZIA管理ポータル > Administration > Cloud Service API Security」から作成&確認することが可能です。

ベースURLとAPIキーの記載箇所
ベースURLとAPIキーの記載箇所

認証情報の保存

最後に認証情報をAWS Secret Managerに保存します。

AWS Secret Managerに格納した値
AWS Secret Managerに格納した値

メイン処理

API操作を行うための関数と主な処理は以下のイメージとなります。

作成する関数と処理の内容
作成する関数と処理の内容

main関数

メインの処理を実施するための関数です。以降に記載の関数を順番に実行します。

def main():
    try:
        #getSecret関数の実行
        secret = getSecret()

        #各変数に認証情報を格納
        Base_URL = secret["BaseURL"]
        Username = secret["Username"]
        Password = secret["Password"]
        API_Key = secret["API_Key"]
        
        #obfuscateApiKey関数の実行
        timestamp,obufuscateApiKey = obfuscateApiKey(API_Key)
        
        #createAuthSession関数の実行
        session = createAuthSession(Base_URL,Username,Password,obufuscateApiKey,timestamp)
        
        #getUserID関数の実行
        UserId = getUserID(Base_URL,session)

        #urlFiltering関数の実行
        urlFiltering(Base_URL,Username,UserId,session)
        
    except Exception as error:
        print(f"エラーが発生しました\n{error}\n")
        
    finally:
        #エラーの有無にかかわらずdeleteSession関数の実行
        deleteSession(Base_URL,session)
        
if __name__ == "__main__":
    main()

getSecret関数

認証情報を取得するための関数で、AWS Secret Managerに認証情報を格納した際に表示されるサンプルコードを使用したものです。 サンプルコードのままだと認証情報がJSON形式で返ってくるためjson.loadsで辞書型に変換しています。

import boto3,json
from botocore.exceptions import ClientError

#Secret Mangerの値を取得
def getSecret():

    secret_name = "Zscaler_API"
    region_name = "ap-northeast-3"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e
    #JSON形式から辞書型に変換
    secret = json.loads(get_secret_value_response['SecretString'])
    return secret

obfuscateApiKey関数

APIキーの難読化処理を実施するための関数です。
こちらはZIA APIのドキュメントの「ZIA管理者認証情報とAPIキー/トークンの使用 > 2.認証してAPIセッションを作成します」に記載されています。

# APIキーの難読化
def obfuscateApiKey(API_Key):
    #UNIXエポック時間を1000倍する
    now = int(time.time() * 1000)
    #末尾から6桁目までを取得しString型に変換
    n = str(now)[-6:]
    #1ビット右にシフト(半分)し、6桁になるよう0を埋める
    r = str(int(n) >> 1).zfill(6)
    key = ""
    #変数nの桁数分ループする
    for i in range(0, len(str(n)), 1):
        #変数nの各桁を利用しAPI_Keyから対応する要素を取り出し、keyに加える
        key += API_Key[int(str(n)[i])]
    #変数rの桁数分ループする    
    for j in range(0, len(str(r)), 1):
        #変数rの各桁を利用しAPI_Keyから対応する要素を取り出し、keyに加える
        key += API_Key[int(str(r)[j])+2]
    return now,key

createAuthSession関数

セッションの作成と認証を行うための関数です。
認証を行うために/authenticatedSessionに対して認証情報を含めたpayloadを送信(POST)します。
戻り値をresponseに格納し、response.raise_for_status()でステータスコードをチェックします。
エラーが出力されなければreturn sessionして処理を終了させます。

import requests as rq

#セッション作成と認証
def createAuthSession(Base_URL,Username,Password,API_Key,Timestamp):

    url = f"{Base_URL}/authenticatedSession"

    #認証情報の格納
    payload = {
        "apiKey" : API_Key,
        "username": Username,
        "password": Password,
        "timestamp" : Timestamp
    }
    
    print("セッションを開始します\n")
    
    try:
        #セッションの開始
        session = rq.session()
        #APIに認証情報をPOSTする
        response = session.post(url=url,json=payload)
        #レスポンスのステータスコードチェック
        response.raise_for_status()
        #レスポンスの内容を表示
        print(f"認証に成功しました\n{response.text}\n")
        return session
    
    except Exception as error:
        print(f"createAuthSession関数でエラーが発生しました :\n{error}\n{response.text}\n")
        raise 

getUserID関数

ユーザIDを取得するための関数で、ポリシー作成時にユーザを指定するために取得する必要があります。
/usersのままユーザ情報を取得(GET)すると膨大な量のユーザ情報が返ってくるためクエリパラメータを使用して
特定のユーザのみを指定しています。

#ユーザIDの取得
def getUserID(Base_URL,Session):
    #自身の情報のみを取得
    url = f"{Base_URL}/users?name=Yanagida API User"
    try:
        #ユーザ情報を取得する
        response = Session.get(url=url)
        #レスポンスのステータスコードチェック
        response.raise_for_status()
        #レスポンスの内容を表示
        print(f"ユーザの取得に成功しました\n{response.text}\n")
        #JSON形式からリスト形式に変換
        response_data = json.loads(response.text)
        #リストからユーザIDを取得
        UserID = response_data[0]['id']
        return UserID

    except Exception as error:
        print(f"getUser関数でエラーが発生しました:\n{error}\n{response.text}\n")
        raise 

urlFiltering関数

URLフィルタリングのポリシーを作成する関数です。
今回はURLカテゴリの酒・タバコに該当するサイトをブロックするポリシーを作成します。
また、ユーザを指定するためにgetUserID関数で取得したUserIDを記載します。

def urlFiltering(Base_URL,Username,UserId,Session):
    url = f"{Base_URL}/urlFilteringRules"
    payload = {
        "name":"API_URLFiltering_Rule",
        "order": 10,
        "protocols": [
            "ANY_RULE"
        ],
        "urlCategories": [
            "ALCOHOL_TOBACCO"
        ],
        "users":[
            {
            "id": UserId,
            "name":Username
            }
        ],
        "rank" : 7,
        "state": "ENABLED",
        "action": "BLOCK"
    }
    
    print("URLフィルタリングのルールを追加します\n")
    
    try:
        #APIにURLフィルタリングのルール情報をPOSTする
        response = Session.post(url=url,json=payload)
        #レスポンスのステータスコードチェック
        response.raise_for_status()
        #レスポンスの内容を表示
        print(f"ルールの作成に成功しました\n{response.text}\n")
        return response
        
    except Exception as error:
        print(f"urlFiltering関数でエラーが発生しました:\n{error}\n{response.text}\n")
        raise 

deleteSession関数

セッションを終了するための関数です。

def deleteSession(Base_URL,Session):
    url = f"{Base_URL}/authenticatedSession"
    try:
        #セッションの削除
        response = Session.delete(url=url)
        #レスポンスのステータスコードチェック
        response.raise_for_status()
        #レスポンスの内容を表示
        print(f"セッションを削除しました\n{response.text}\n")
        
    except Exception as error:
        print(f"deleteSession関数でエラーが発生しました\n{error}\n{response.text}\n")
        raise 

実行結果

実行結果として以下の情報が返ってきます。

Pythonコードの実行結果
Pythonコードの実行結果

「ZIA管理ポータル > Policy > URL & Cloud App Control > URL Filtering Policy」から確認してみると想定通りのポリシーが作成されていることが確認できました。

API経由で作成したルール
API経由で作成したルール

まとめ

APIを利用してZIAのポリシーを作成する方法をご紹介しました。
やはり利点としては、管理ポータルでの操作と違って自分で処理の内容を記述できるカスタマイズ性だと感じました。
今後は数十個のポリシーを一括で作成したり、時間指定して実行させたりと色々試してみたいなと思いました!

0-WANについて
私たち0-WANは、ゼロトラスト製品を中心とした、マルチベンダーでのご提案で、お客様の経営課題解決を支援しております。 ゼロトラストってどうやるの?製品を導入したけれど使いこなせていない気がする等々、どんな内容でも支援いたします。 お気軽にご相談ください。
問い合わせ先、0-WANについてはこちら。

www.ap-com.co.jp