目次
はじめに
こんにちは、エーピーコミュニケーションズ iTOC事業部BzD部0-WANの柳田です。
ZscalerではOneAPI(以下、API)経由でポリシーの作成や変更、削除を行うことが可能です。
今回はAPIを利用してZscaler Internet Access(以下、ZIA)の機能であるURLフィルタリングのポリシーを作成したので手順をご紹介します。
全体図
まず最初にAWS Secret Managerから必要な認証情報を取得し、API操作の処理を記述したPythonコードを実行します。
成功すればURLフィルタリングのポリシーが作成され、実行結果が返ってきます。

API操作を行うための準備
Zscalerでは以下のAPIへのアクセス方法を提供しています。
今回は「クラウドサービスAPI」の「基本認証とAPIキーの組み合わせ」を使用します。その場合には、以下の4つの情報が必要になります。

ログインID
パスワード
ベースURL
APIキー
API操作を行うにあたって、専用の管理者ユーザを作成して適切なロールを割り当てることが推奨されています。
また、注意点として ZIdentity対応のテナントではIdP経由でプロビジョニングされたユーザはAPIに対して認証を行うことができません。

ログインIDとパスワードの取得
上記を踏まえて新たにAPI用ユーザとロールを作成し、ログインIDとパスワードを取得します。
ユーザは「ZIdentity管理ポータル > Directory > Users」から作成します。

ロールは「ZIA管理ポータル > Administration > Role Management」から作成します。

今回は「URL & Cloud App Control」と「User Management」の権限を「Full」に設定し、その他の項目は「View Only」または「None」にします。


最後に「ZIdentity管理ポータル > Administration > Entitlements > Administrative > Zscaler Internet Access」からAPI操作を行うユーザに、作成したロールを割り当てれば完了です。

ベースURLとAPIキーの取得
ベースURLとAPIキーは「ZIA管理ポータル > Administration > Cloud Service API Security」から作成&確認することが可能です。

認証情報の保存
最後に認証情報をAWS Secret Managerに保存します。

メイン処理
API操作を行うための関数と主な処理は以下のイメージとなります。

main関数
メインの処理を実施するための関数です。以降に記載の関数を順番に実行します。
def main(): try: #getSecret関数の実行 secret = getSecret() #各変数に認証情報を格納 Base_URL = secret["BaseURL"] Username = secret["Username"] Password = secret["Password"] API_Key = secret["API_Key"] #obfuscateApiKey関数の実行 timestamp,obufuscateApiKey = obfuscateApiKey(API_Key) #createAuthSession関数の実行 session = createAuthSession(Base_URL,Username,Password,obufuscateApiKey,timestamp) #getUserID関数の実行 UserId = getUserID(Base_URL,session) #urlFiltering関数の実行 urlFiltering(Base_URL,Username,UserId,session) except Exception as error: print(f"エラーが発生しました\n{error}\n") finally: #エラーの有無にかかわらずdeleteSession関数の実行 deleteSession(Base_URL,session) if __name__ == "__main__": main()
getSecret関数
認証情報を取得するための関数で、AWS Secret Managerに認証情報を格納した際に表示されるサンプルコードを使用したものです。
サンプルコードのままだと認証情報がJSON形式で返ってくるためjson.loadsで辞書型に変換しています。
import boto3,json from botocore.exceptions import ClientError #Secret Mangerの値を取得 def getSecret(): secret_name = "Zscaler_API" region_name = "ap-northeast-3" # Create a Secrets Manager client session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name=region_name ) try: get_secret_value_response = client.get_secret_value( SecretId=secret_name ) except ClientError as e: raise e #JSON形式から辞書型に変換 secret = json.loads(get_secret_value_response['SecretString']) return secret
obfuscateApiKey関数
APIキーの難読化処理を実施するための関数です。
こちらはZIA APIのドキュメントの「ZIA管理者認証情報とAPIキー/トークンの使用 > 2.認証してAPIセッションを作成します」に記載されています。
# APIキーの難読化 def obfuscateApiKey(API_Key): #UNIXエポック時間を1000倍する now = int(time.time() * 1000) #末尾から6桁目までを取得しString型に変換 n = str(now)[-6:] #1ビット右にシフト(半分)し、6桁になるよう0を埋める r = str(int(n) >> 1).zfill(6) key = "" #変数nの桁数分ループする for i in range(0, len(str(n)), 1): #変数nの各桁を利用しAPI_Keyから対応する要素を取り出し、keyに加える key += API_Key[int(str(n)[i])] #変数rの桁数分ループする for j in range(0, len(str(r)), 1): #変数rの各桁を利用しAPI_Keyから対応する要素を取り出し、keyに加える key += API_Key[int(str(r)[j])+2] return now,key
createAuthSession関数
セッションの作成と認証を行うための関数です。
認証を行うために/authenticatedSessionに対して認証情報を含めたpayloadを送信(POST)します。
戻り値をresponseに格納し、response.raise_for_status()でステータスコードをチェックします。
エラーが出力されなければreturn sessionして処理を終了させます。
import requests as rq #セッション作成と認証 def createAuthSession(Base_URL,Username,Password,API_Key,Timestamp): url = f"{Base_URL}/authenticatedSession" #認証情報の格納 payload = { "apiKey" : API_Key, "username": Username, "password": Password, "timestamp" : Timestamp } print("セッションを開始します\n") try: #セッションの開始 session = rq.session() #APIに認証情報をPOSTする response = session.post(url=url,json=payload) #レスポンスのステータスコードチェック response.raise_for_status() #レスポンスの内容を表示 print(f"認証に成功しました\n{response.text}\n") return session except Exception as error: print(f"createAuthSession関数でエラーが発生しました :\n{error}\n{response.text}\n") raise
getUserID関数
ユーザIDを取得するための関数で、ポリシー作成時にユーザを指定するために取得する必要があります。
/usersのままユーザ情報を取得(GET)すると膨大な量のユーザ情報が返ってくるためクエリパラメータを使用して
特定のユーザのみを指定しています。
#ユーザIDの取得 def getUserID(Base_URL,Session): #自身の情報のみを取得 url = f"{Base_URL}/users?name=Yanagida API User" try: #ユーザ情報を取得する response = Session.get(url=url) #レスポンスのステータスコードチェック response.raise_for_status() #レスポンスの内容を表示 print(f"ユーザの取得に成功しました\n{response.text}\n") #JSON形式からリスト形式に変換 response_data = json.loads(response.text) #リストからユーザIDを取得 UserID = response_data[0]['id'] return UserID except Exception as error: print(f"getUser関数でエラーが発生しました:\n{error}\n{response.text}\n") raise
urlFiltering関数
URLフィルタリングのポリシーを作成する関数です。
今回はURLカテゴリの酒・タバコに該当するサイトをブロックするポリシーを作成します。
また、ユーザを指定するためにgetUserID関数で取得したUserIDを記載します。
def urlFiltering(Base_URL,Username,UserId,Session): url = f"{Base_URL}/urlFilteringRules" payload = { "name":"API_URLFiltering_Rule", "order": 10, "protocols": [ "ANY_RULE" ], "urlCategories": [ "ALCOHOL_TOBACCO" ], "users":[ { "id": UserId, "name":Username } ], "rank" : 7, "state": "ENABLED", "action": "BLOCK" } print("URLフィルタリングのルールを追加します\n") try: #APIにURLフィルタリングのルール情報をPOSTする response = Session.post(url=url,json=payload) #レスポンスのステータスコードチェック response.raise_for_status() #レスポンスの内容を表示 print(f"ルールの作成に成功しました\n{response.text}\n") return response except Exception as error: print(f"urlFiltering関数でエラーが発生しました:\n{error}\n{response.text}\n") raise
deleteSession関数
セッションを終了するための関数です。
def deleteSession(Base_URL,Session): url = f"{Base_URL}/authenticatedSession" try: #セッションの削除 response = Session.delete(url=url) #レスポンスのステータスコードチェック response.raise_for_status() #レスポンスの内容を表示 print(f"セッションを削除しました\n{response.text}\n") except Exception as error: print(f"deleteSession関数でエラーが発生しました\n{error}\n{response.text}\n") raise
実行結果
実行結果として以下の情報が返ってきます。

「ZIA管理ポータル > Policy > URL & Cloud App Control > URL Filtering Policy」から確認してみると想定通りのポリシーが作成されていることが確認できました。

まとめ
APIを利用してZIAのポリシーを作成する方法をご紹介しました。
やはり利点としては、管理ポータルでの操作と違って自分で処理の内容を記述できるカスタマイズ性だと感じました。
今後は数十個のポリシーを一括で作成したり、時間指定して実行させたりと色々試してみたいなと思いました!
0-WANについて
私たち0-WANは、ゼロトラスト製品を中心とした、マルチベンダーでのご提案で、お客様の経営課題解決を支援しております。
ゼロトラストってどうやるの?製品を導入したけれど使いこなせていない気がする等々、どんな内容でも支援いたします。
お気軽にご相談ください。
問い合わせ先、0-WANについてはこちら。