APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

AWS Step Functions で Task の出力を保持しておき後の Task で参照する

こんにちは。クラウド事業部の野本です。

AWS 上でバッチ処理を実装する際、複数の Lambda 関数を順番に呼び出すために Step Functions を使う機会がありました。単純なものなら Workflow Studio でステートを並べるだけで視覚的に組み立てられるのですが、 Lambda の入出力を複雑に受け渡すには Step Functions の入出力を理解する必要があり苦労しました。

本記事では Step Functions の離れたステート間で入出力を受け渡す設定方法について纏めます。特に Task ステートにおける入出力処理を、ステートの定義をプログラムに翻訳して理解してみます。

したいこと

4つの Lambda 関数を順番に実行させたいです。 Step Functions を用いれば下図のように Task ステートを並べたステートマシンとなります。

Lambda 関数の入出力には追加の要件があり、3番目の Lambda の入力には1番目の Lambda の出力を、4番目の Lambda の入力には2番目の Lambda の出力を必要とします。しかし Workflow Studio でただステートを並べただけでは、 Lambda に渡せるデータは直前の Lambda の結果だけとなり、2つ前の出力は消えてしまいます

最初はステートマシンに合うように各 Lambda の入出力仕様を変えないといけないのかと思いましたが、調べていくとステートの入出力の設定をきちんと書けば対応できることがわかりました。

設定方法

例えば Task01 の結果を Task03 で使うには、以下のような5つの設定を入れておきます。(デフォルト値であっても明記しています)

"Task01": {
  ...,
  "ResultSelector": {
    "StatusCode.$": "$.StatusCode",
    "Payload.$": "$.Payload"
  },
  "ResultPath": "$.Result01",
  "OutputPath": "$"
},
...
"Task03": {
  "InputPath": "$.Result01.Payload",
  "Parameters": {
    "FunctionName": "...",
    "Payload.$": "$"
  },
  ...
}
  • 出力時
    • ResultPath で適当なパスを指定して保存し、 OutputPath はデフォルト値(全て出力)にします
      • 保存したデータを途中で捨てないよう、以降の各ステートでも同様の対応が必要です
    • ResultSelector では保存したい項目を抽出・整形できます
  • 入力時
    • InputPath では入力の一部を抽出でき、保存した特定のステートの結果を参照しやすくなります
    • Parameters では普通にパラメータを構築します

ResultPathOutputPath が重要で、 ResultSelectorInputPath は指定しなくても Parameters で調整できます。

(個人的には "ResultSelector.$": "$.Payload" のような整形をしたいのですが、その良い方法はまだわかっていません)

入出力の設定の仕組み

5つの設定項目がどう働くのかは、公式ドキュメントの「Step Functions の入出力処理 - AWS Step Functions」に書いてあります。今回はそれをもっとコンパクトに纏めるため、ステートマシンの定義に近いコードの形で整理してみました。

ある定義の Task ステートをプログラミング言語での関数に翻訳すると、入力(引数)を与えてから出力(戻り値)が返るまでの処理は概ね以下のようになります。(コメント部が翻訳元の設定です)

function executeMyTask(message) {

    // "InputPath": "$.foo"
    const input = message.foo;

    // "Parameters": {
    //   "FunctionName": "...",
    //   "Payload": { "hoge.$": "$.fuga" }
    // }
    const parameters = {
        FunctionName: "...",
        Payload: { hoge: input.fuga }
    } || input;

    const response = callAPI(parameters);

    // "ResultSelector": {
    //   "StatusCode.$": "$.StatusCode",
    //   "Payload.$": "$.Payload"
    // }
    const result = {
        StatusCode: response.StatusCode,
        Payload: response.Payload
    } || response;

    // "ResultPath": "$.bar"
    message.bar = result;

    // "OutputPath": "$.baz"
    return message.baz;
}

$ は状況によって異なる変数を表し、また ResultPath の表すものだけは = の左に置かれているなど、設定ごとに異なる振る舞いがあります。とはいえ各設定の翻訳は単純で、全体の処理も追いやすい形な印象です。


さて、入力したオブジェクト(message)が出力されれば元のデータを保持できるのですが、それを妨げうる設定が2つあることがわかります。

  • OutputPath$ 以外のパスを指定すると message の一部のみを抽出して出力します
  • ResultPath$ を指定すると message 全体を result に置き換えます

この2つを適切に設定することが、ステートマシン実行中にデータを保持し続ける際に重要になります。

Workflow Studio 初期設定のままの場合

Workflow Studio で Lambda 実行を繋げただけの定義を読み解いてみます。

"Task02": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "OutputPath": "$.Payload",
  "Parameters": {
    "Payload.$": "$",
    "FunctionName": "..."
  },
  "Retry": [...],
  "Next": "Task03"
},
"Task03": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "OutputPath": "$.Payload",
  "Parameters": {
    "Payload.$": "$",
    "FunctionName": "..."
  },
  "Retry": [...],
  "Next": "Task04"
},

どちらも同じ形をしていて、関数に翻訳すると以下のようになります。

function executeTaskN(message) {

    // "InputPath": "$"  /* default */
    const input = message;

    // "Parameters": {
    //   "Payload.$": "$",
    //   "FunctionName": "..."
    // }
    const parameters = {
        Payload: input,
        FunctionName: "..."
    } || input;

    const response = callAPI(parameters);

    // "ResultSelector": null  /* default */
    const result = null || response;

    // "ResultPath": "$"  /* default */
    message = result;

    // "OutputPath": "$.Payload"
    return message.Payload;
}
  • Task02 出力時
    • ResultSelector 無指定(null)なので、レスポンス全体が result に入ります(Lambda なら PayloadSdkHttpMetadata などが含まれています)
    • ResultPath 無指定($)なので、 resultmessage そのものに保存(上書き)します
    • OutputPath$.Payload なので、 message (= result (= response)) の一部を抽出します
    • 結果的に response.Payload の部分のみがステートの出力となり、次のステートの入力へ渡されます
  • Task03 入力時
    • InputPath 無指定($)なので、入力全体を使ってパラメータを作ります
    • Parameters 内で $ と指定すると input (= message (= response.Payload)) を参照します
    • → したがって "Payload.$": "$" は前回の Lambda の戻り値をそのままイベントとして渡すことになります

このように、ResultPathOutputPath によって元の message は完全に消えています。直前の結果のみでいい場合は非常にコンパクトな方法です。

不要なデータの整理

全ての出力を保存していると、 Step Functions の制限である 256 KB を超えてしまう可能性があります。ある程度大きなステートマシンになると、出力の仕組みを利用して不要なデータを消すことも必要です。

それには例えば以下のような方法が考えられます。

  • ResultSelector で、必要なデータだけ抽出します(これは本記事の例でも扱いました)
  • ResultPath で、もう使わないデータのパスに上書きします
    • 特に "ResultSelector": {}, "ResultPath": "$" とすると、ステートの出力を空にできます
    • 一方で Task 内の結果が完全に不要なら、 ResultPathnull を指定して捨てられます
  • コンテキストオブジェクトにはステートマシン開始時の入力が含まれるため、 "OutputPath": "$$.Execution.Input" を指定するとデータをリセットできます
  • Pass ステートを用いて入力を整形し、後続に必要なデータだけ抽出します(※状態遷移が増えるため料金に影響します)

まとめ

Step Functions のステートの入出力の設定方法について整理しました。 Workflow Studio の初期設定のままだと直前のステートの結果しか残りませんが、出力(特に ResultPathOutputPath)をきちんと設定すればあるステートでの出力を保持してしばらく後のステートでも参照できることがわかりました。

実案件では、 Map でもっと複雑なデータ受け渡しが必要になったり、 Lambda の入出力仕様が途中で変更になったりもしました。これらも Step Functions の入出力がわかってからは、かなり思い通りに設定できるようになりました。