先進サービス開発事業部の山岡です。
GCP DatastoreのバックアップにはImport/Exportというサービスを使いますが、これには結果をレポートする機能が無く運用する上で不便です。今回APIをポーリングしSlackにレポートする仕組みを作りましたのでその知見を共有したいと思います。
基本的な動作の流れ
- GAE Cronからバックアップのタスクをキックする 参考記事
- タスク起動時に取得した
name
(ジョブ固有に割り当てられるIDのようなもの)を取得しポーリングタスクをキックする( projects/sample-projectname/operations/A1SA0MTEwODhaXNhLXNib2otbmltZGEak0MZAcSMXRzYWwgadGxYWZlVodHJvbi1CjES
というような形式です)
- APIをポーリングし終了していた場合、もしくは所定の時間内に終わらなかった等の結果をSlackに送信する
結果表示例
シーケンス・フローチャート
サンプルコード
※Slackの通知処理は除外してあります。手前味噌で恐縮ですが、もしよかったら私が作ったライブラリを使ってみて下さい
JSON格納用の構造体とメソッド、変数等
const (
POLLING_INTERVAL = 60
MAX_POLLING_COUNT = 60
)
var (
JST = time.FixedZone("Asia/Tokyo", 9*60*60)
)
type datastoreOperation struct {
Done bool `datastore:"done" json:"done"`
Metadata datastoreOperationMetadata `datastore:"metadata" json:"metadata"`
Name string `datastore:"name" json:"name"`
Response datastoreOperationResponse `datastore:"response" json:"response"`
}
type datastoreOperationMetadata struct {
Type string `datastore:"type" json:"@type"`
Common datastoreOperationMetadataCommon `datastore:"common" json:"common"`
EntityFilter datastoreOperationMetadataEntityFilter `datastore:"entity_filter" json:"entityFilter"`
OutputUrlPrefix string `datastore:"output_url_prefix" json:"outputUrlPrefix"`
ProgressBytes datastoreOperationMetadataProgressBytes `datastore:"progress_bytes" json:"progressBytes"`
ProgressEntities datastoreOperationMetadataProgressEntities `datastore:"progress_entities" json:"progressEntities"`
}
type datastoreOperationMetadataCommon struct {
EndTime time.Time `datastore:"end_time" json:"endTime"`
OperationType string `datastore:"operation_type" json:"operationType"`
StartTime time.Time `datastore:"start_time" json:"startTime"`
State string `datastore:"state" json:"state"`
}
type datastoreOperationMetadataEntityFilter struct {
Kinds []string `datastore:"kinds" json:"kinds"`
}
type datastoreOperationMetadataProgressBytes struct {
WorkCompleted string `datastore:"work_completed" json:"workCompleted"`
WorkEstimated string `datastore:"work_estimated" json:"workEstimated"`
}
type datastoreOperationMetadataProgressEntities struct {
WorkCompleted string `datastore:"work_completed" json:"workCompleted"`
WorkEstimated string `datastore:"work_estimated" json:"workEstimated"`
}
type datastoreOperationResponse struct {
Type string `datastore:"type" json:"@type"`
OutputUrl string `datastore:"output_url" json:"outputUrl"`
}
func (d datastoreOperation) key(ctx context.Context) *datastore.Key {
return datastore.NewKey(ctx, BACKUPTASKS_KIND, genSha1Hash(d.Name), 0, nil)
}
func (d datastoreOperation) put(ctx context.Context) error {
if d.Name == "" {
return errors.New("backup: validation error (missing Name field)")
}
if _, err := datastore.Put(ctx, d.key(ctx), &d); err != nil {
return err
}
return nil
}
APIからバックアップのタスクを取得する関数
func getBackupTaskStatus(ctx context.Context, jobId string) (datastoreOperation, error) {
req, err := http.NewRequest(
"GET",
"https://datastore.googleapis.com/v1/"+jobId,
nil,
)
if err != nil {
return datastoreOperation{}, err
}
token, err := getApiToken(ctx)
if err != nil {
return datastoreOperation{}, err
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := urlfetch.Client(ctx).Do(req)
if err != nil {
return datastoreOperation{}, err
}
defer resp.Body.Close()
bodyByte, err := ioutil.ReadAll(resp.Body)
if err != nil {
return datastoreOperation{}, err
}
if resp.StatusCode >= 400 {
log.Warningf(ctx, "failed to API request get backup status: %s", string(bodyByte))
return datastoreOperation{}, errors.New("backup: API request error")
}
var operation datastoreOperation
if err := json.Unmarshal(bodyByte, &operation); err != nil {
return datastoreOperation{}, err
}
if err := operation.put(ctx); err != nil {
return datastoreOperation{}, err
}
return operation, nil
}
func getApiToken(ctx context.Context) (string, error) {
token, _, err := appengine.AccessToken(ctx, "https://www.googleapis.com/auth/datastore", "https://www.googleapis.com/auth/cloud-platform")
if err != nil {
return "", err
}
return token, nil
}
ポーリング処理
func backupTaskPolling(c echo.Context) error {
ctx := appengine.NewContext(c.Request())
jobId := unescape(c.QueryParam("name"))
if jobId == "" {
log.Errorf(ctx, "failed to start baskup task polling: missing jobID")
return echo.NewHTTPError(400)
}
for i := 0; i < MAX_POLLING_COUNT; i++ {
time.Sleep(time.Second * POLLING_INTERVAL)
operation, err := getBackupTaskStatus(ctx, jobId)
switch {
case err != nil:
log.Warningf(ctx, "failed to get backup task status: %s", err.Error())
case operation.Done:
color := ""
title := ""
text := fmt.Sprintf("開始日時: %s\n終了日時: %s\nEntity数: %s\nバックアップファイル: `%s`\nJobID: `%s`",
operation.Metadata.Common.StartTime.In(JST).Format("2006/01/02 15:04:05 JST"),
operation.Metadata.Common.EndTime.In(JST).Format("2006/01/02 15:04:05 JST"),
operation.Metadata.ProgressEntities.WorkCompleted,
operation.Response.OutputUrl,
operation.Name,
)
switch operation.Metadata.Common.State {
case "SUCCESSFUL":
color = slack.COLOR_GREEN
title = "Datastoreのバックアップに成功"
case "FAILED":
color = slack.COLOR_RED
title = "Datastoreのバックアップに失敗"
text += "\n下記コマンドで状態を確認して対処して下さい\n\n`gcloud datastore operations describe " + jobId + "`"
default:
color = slack.COLOR_ORANGE
title = "Datastoreのバックアップが想定外の状態"
text += "\n下記コマンドで状態を確認して対処して下さい\n\n`gcloud datastore operations describe " + jobId + "`"
}
if err := postSlack(ctx, color, title, text); err != nil {
log.Warningf(ctx, "failed to post slack: %s", err.Error())
continue
}
return c.NoContent(204)
default:
}
}
log.Errorf(ctx, "failed to polling: polling limit exceeded")
color := slack.COLOR_ORANGE
title := "バックアップが終わっていない"
text := "下記コマンドで状態を確認して対処して下さい\n\n`gcloud datastore operations describe " + jobId + "`"
if err := postSlack(ctx, color, title, text); err != nil {
log.Warningf(ctx, "failed to post slack: %s", err.Error())
}
return echo.NewHTTPError(500)
}
注意点
GAEのスケーリング設定がAutomatic Scalingだと1リクエスト60秒制限がありポーリングには使えませんのでBasic Scalingで実行する必要があります。こちらは最大24時間まで処理ができるのでこういった用途に丁度良いでしょう。
Basic Scalingを設定するには
app.yaml
を書き換える必要があります。細かい部分を除いた差分としては以下の記述が必要です。
instance_class: B1
basic_scaling:
max_instances: 3
idle_timeout: 1m
handlers:
- url: /_ah/start
script: _go_app
login: admin
auth_fail_action: unauthorized
- url: /_ah/stop
script: _go_app
login: admin
auth_fail_action: unauthorized
Basic Scalingインスタンスの起動・終了時処理
インスタンスの起動時に所定URLへアクセスが走るので、そのURLの処理を記述しておけば起動処理と終了処理を行うことができます。
起動時は /_ah/start
に対しリクエストが来ますが、これに対しHTTP200番代 or 404でレスポンスしないとインスタンスがTerminateされてしまいいつまで経っても処理が始まりません。
また終了時は /_ah/stop
にリクエストが来ます。これはどんなレスポンスをしても問題ありませんが、エラー系を返すとログにもエラーで記録されてしまいます。
特に何か処理する必要が無ければ両方ともHTTP204 (No Content)を返却しておけば大丈夫です。Golang+echoでコードを書くとすると以下のような具合になるかと思います。
func init() {
e := echo.New()
e.GET("/_ah/start", func(c echo.Context) error { return c.NoContent(204) })
e.GET("/_ah/stop", func(c echo.Context) error { return c.NoContent(204) })
}