こんにちは!ACS事業部の谷合です。 皆大好きGitHub Actionsにおける、GitHub社公式のSelf-hosted runnerであるActions Runner Controller(以降ARC)の紹介をシリーズでお送りしております。
前回までに以下の記事を書いておりました。
- Actions Runner Controller Deep Dive!- アーキテクチャ編 - - APC 技術ブログ
- Actions Runner Controller Deep Dive!- 動作解説編 - - APC 技術ブログ
- Actions Runner Controller Deep Dive!- コード解説 前編 - - APC 技術ブログ
前回に引き続き、Actions Runner Controllerのコード解説をしていきます。
はじめに
今回のコード解説は以下のリンクにある流れを追っています。
前回までは、AutoScalingRunnerSet、EphemeralRunnerSet、AutoScalingListenerリソースおよびAutoScalingListener Podの作成までを解説しました。 今回はAutoScalingListener PodでのGitHubとのロングポーリングなどの詳細動作や、Workflow実行時のEphemeralRunnerリソースやEphemeralRunner Pod作成動作を追っていきます。
なお、各リソースの説明は以下リンクを参照ください。
この記事のこと
ARCのコード解説します。ただ、以下のルールの基、解説していきます。
- 解説すること
- ARCの起動からCI/CD Workflowの開始、終了までの動作
- 解説しないこと
- Proxy, TLSなどを使った接続設定
- metricsの設定
- コードリーディング時のcommit
- f1d7c52253b89f0beae60141f8465d9495cdc2cf
コード解説
前回までで、AutoScalingRunnerSet、EphemeralRunnerSet、AutoScalingListenerリソースおよびAutoScalingListener Podの作成までできました。
今回は、AutoScalingListener PodでのGitHubとのロングポーリング確立までの解説から入っていきます。
AutoScalingListener Podの解説
まずは、前回の続きである以下の動作を解説します。なお、AutoScalingListener Podの作成は終わっているので、ロングポーリングを最初に解説していきます。
3.A Runner ScaleSet Listener pod is deployed by the AutoScaling Listener Controller. In this pod, the listener application connects to the Actions Service to authenticate and establish a long poll HTTPS connection. The listener stays idle until it receives a Job Available message from the Actions Service.
ロングポーリング確立
AutoScalingListener Podは、cmd/githubrunnerscalesetlistener の処理を実行します。
まずは、main.goのmain関数からスタートします。
main関数がスタートすると、envconfig.Process関数を使用し、prefixにgithubが付く環境変数をすべてRunnerScaleSetListenerConfig構造体に格納します。
この時、環境変数は大文字でも構いません。環境変数は、AutoScalingListener Pod作成時に設定した際の、この箇所や、TLS設定のものとなります。
type RunnerScaleSetListenerConfig struct { ConfigureUrl string `split_words:"true"` AppID int64 `split_words:"true"` AppInstallationID int64 `split_words:"true"` AppPrivateKey string `split_words:"true"` Token string `split_words:"true"` EphemeralRunnerSetNamespace string `split_words:"true"` EphemeralRunnerSetName string `split_words:"true"` MaxRunners int `split_words:"true"` MinRunners int `split_words:"true"` RunnerScaleSetId int `split_words:"true"` RunnerScaleSetName string `split_words:"true"` ServerRootCA string `split_words:"true"` LogLevel string `split_words:"true"` LogFormat string `split_words:"true"` MetricsAddr string `split_words:"true"` MetricsEndpoint string `split_words:"true"` } func main() { var rc RunnerScaleSetListenerConfig if err := envconfig.Process("github", &rc); err != nil { fmt.Fprintf(os.Stderr, "Error: processing environment variables for RunnerScaleSetListenerConfig: %v\n", err) os.Exit(1) }
ちなみにprefixを元に環境変数を構造体に入れるライブラリですが、以下のものです。
非常に便利なので、是非使ってみてください。
github.com
次に、SIGINT, SIGTERMがPod内で発生した場合にcontextをcancelさせるため、以下のsignal.NotifyContext関数でcontextを作成します。
更に、contextをerrgroup.WithContext関数に渡してあげることで、goroutine用のGroupを作成しています。
AutoScalingListener PodはこのGroupを使用し、後続処理を非同期で並行処理しています。
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
g, ctx := errgroup.WithContext(ctx)
前述した並行処理とは、以下の箇所を指しており、ロングポーリングやEphemeralRunnerSetの更新などのAutoScalingListener Podの主処理や、Metrics Serverの起動を非同期に行っています。なお、今回はMetrics関連の解説はスコープ外としているので、解説は割愛します。
return run(ctx, rc, logger, opts)
の箇所でAutoScalingListener Podの主処理を起動します。
g.Go(func() error { opts := runOptions{ serviceOptions: []func(*Service){ WithLogger(logger), }, } opts.serviceOptions = append(opts.serviceOptions, WithPrometheusMetrics(rc)) return run(ctx, rc, logger, opts) }) if len(rc.MetricsAddr) != 0 { g.Go(func() error { metricsServer := metricsServer{ rc: rc, logger: logger, } g.Go(func() error { <-ctx.Done() return metricsServer.shutdown() }) return metricsServer.listenAndServe() }) }
run関数は以下の定義となっており、この関数でロングポーリングの開始と、GitHubからのメッセージ受信を契機としたEphemeralRunnerSetの.Spec.Replicasの変更でのRunnerのスケールを行います。処理は複数工程に分かれているので、以降で細かく見ていきたいと思います。
func run(ctx context.Context, rc RunnerScaleSetListenerConfig, logger logr.Logger, opts runOptions) error { // Create root context and hook with sigint and sigterm creds := &actions.ActionsAuth{} if rc.Token != "" { creds.Token = rc.Token } else { creds.AppCreds = &actions.GitHubAppAuth{ AppID: rc.AppID, AppInstallationID: rc.AppInstallationID, AppPrivateKey: rc.AppPrivateKey, } } actionsServiceClient, err := newActionsClientFromConfig( rc, creds, actions.WithLogger(logger), actions.WithUserAgent(fmt.Sprintf("actions-runner-controller/%s", build.Version)), ) if err != nil { return fmt.Errorf("failed to create an Actions Service client: %w", err) } // Create message listener autoScalerClient, err := NewAutoScalerClient(ctx, actionsServiceClient, &logger, rc.RunnerScaleSetId) if err != nil { return fmt.Errorf("failed to create a message listener: %w", err) } defer autoScalerClient.Close() // Create kube manager and scale controller kubeManager, err := NewKubernetesManager(&logger) if err != nil { return fmt.Errorf("failed to create kubernetes manager: %w", err) } scaleSettings := &ScaleSettings{ Namespace: rc.EphemeralRunnerSetNamespace, ResourceName: rc.EphemeralRunnerSetName, MaxRunners: rc.MaxRunners, MinRunners: rc.MinRunners, } service, err := NewService(ctx, autoScalerClient, kubeManager, scaleSettings, opts.serviceOptions...) if err != nil { return fmt.Errorf("failed to create new service: %v", err) } // Start listening for messages if err = service.Start(); err != nil { return fmt.Errorf("failed to start message queue listener: %w", err) } return nil }
main関数内で、環境変数からToken(PAT)またはGitHub Appを構造体に入れていました。 構造体からCredentialとして、Token(PAT)またはGitHub Appを更にactions.ActionsAuth構造体に入れます。
creds := &actions.ActionsAuth{} if rc.Token != "" { creds.Token = rc.Token } else { creds.AppCreds = &actions.GitHubAppAuth{ AppID: rc.AppID, AppInstallationID: rc.AppInstallationID, AppPrivateKey: rc.AppPrivateKey, } }
全環境変数が入っているRunnerScaleSetListenerConfig構造体と、前の処理で取得したCredentialを使用しGitHub用のClientを作成します。
actionsServiceClient, err := newActionsClientFromConfig( rc, creds, actions.WithLogger(logger), actions.WithUserAgent(fmt.Sprintf("actions-runner-controller/%s", build.Version)), ) if err != nil { return fmt.Errorf("failed to create an Actions Service client: %w", err) }
newActionsClientFromConfig関数ではさらに関数が呼ばれており、処理が込み入っているため折りたたみます。
newActionsClientFromConfig関数説明
actions.NewClient関数が呼ばれます。なお、省略している箇所は、スコープ外のTLSやProxy設定です。
func newActionsClientFromConfig(config RunnerScaleSetListenerConfig, creds *actions.ActionsAuth, options ...actions.ClientOption) (*actions.Client, error) { : return actions.NewClient(config.ConfigureUrl, creds, options...) }
この関数で、CredentialやGitHubのURL(Enterprise or Organization or Repository)などを使用し、GitHub用のClientを作成します。
returnする直前に、ac.Client = retryClient.StandardClient()
の箇所があります。ここでは、hashicorp/go-retryablehttpで、retry設定を追加して、Client構造体のhttp.Clientに入れています。これを行うことでGitHubとのロングポーリングが可能となるわけです。
func NewClient(githubConfigURL string, creds *ActionsAuth, options ...ClientOption) (*Client, error) { config, err := ParseGitHubConfigFromURL(githubConfigURL) if err != nil { return nil, fmt.Errorf("failed to parse githubConfigURL: %w", err) } ac := &Client{ creds: creds, config: config, logger: logr.Discard(), // retryablehttp defaults retryMax: 4, retryWaitMax: 30 * time.Second, userAgent: UserAgentInfo{ Version: build.Version, CommitSHA: build.CommitSHA, ScaleSetID: 0, }, } for _, option := range options { option(ac) } retryClient := retryablehttp.NewClient() retryClient.Logger = log.New(io.Discard, "", log.LstdFlags) retryClient.RetryMax = ac.retryMax retryClient.RetryWaitMax = ac.retryWaitMax transport, ok := retryClient.HTTPClient.Transport.(*http.Transport) if !ok { // this should always be true, because retryablehttp.NewClient() uses // cleanhttp.DefaultPooledTransport() return nil, fmt.Errorf("failed to get http transport from retryablehttp client") } : retryClient.HTTPClient.Transport = transport ac.Client = retryClient.StandardClient() return ac, nil }
ここまでが、newActionsClientFromConfig関数に関する一連の説明となります。
newActionsClientFromConfig関数で作成した、GitHub用のClientを使用し、NewAutoScalerClient関数でListenerを作成します。
// Create message listener autoScalerClient, err := NewAutoScalerClient(ctx, actionsServiceClient, &logger, rc.RunnerScaleSetId) if err != nil { return fmt.Errorf("failed to create a message listener: %w", err) } defer autoScalerClient.Close()
NewAutoScalerClient関数では、前述したようにListenerを作成します。
NewAutoScalerClient関数も処理が込み合いますので、折りたたみます。
NewAutoScalerClient関数説明
createSession関数でGitHubとのSessionを張ってresponseと、初回のMessageを取得します。
この時、ARCインストール前にGitHub Actionsが動いているかもしれません。その場合Message内のBodyにそのJobのlistが、jsonとして入ってきます。
これらをListenerとして使用します。
func NewAutoScalerClient( ctx context.Context, client actions.ActionsService, logger *logr.Logger, runnerScaleSetId int, options ...func(*AutoScalerClient), ) (*AutoScalerClient, error) { listener := AutoScalerClient{ logger: logger.WithName("auto_scaler"), } session, initialMessage, err := createSession(ctx, &listener.logger, client, runnerScaleSetId) if err != nil { return nil, fmt.Errorf("fail to create session. %w", err) } listener.lastMessageId = 0 listener.initialMessage = initialMessage listener.client = newSessionClient(client, logger, session) for _, option := range options { option(&listener) } return &listener, nil }
なお、SessionやMessageは以下の構造体で管理されます。
type RunnerScaleSetSession struct { SessionId *uuid.UUID `json:"sessionId,omitempty"` OwnerName string `json:"ownerName,omitempty"` RunnerScaleSet *RunnerScaleSet `json:"runnerScaleSet,omitempty"` MessageQueueUrl string `json:"messageQueueUrl,omitempty"` MessageQueueAccessToken string `json:"messageQueueAccessToken,omitempty"` Statistics *RunnerScaleSetStatistic `json:"statistics,omitempty"` }
type RunnerScaleSetMessage struct { MessageId int64 `json:"messageId"` MessageType string `json:"messageType"` Body string `json:"body"` Statistics *RunnerScaleSetStatistic `json:"statistics"` }
createSession関数の、Sessionと初回のMessageを取得する処理を見ていきましょう。
func createSession(ctx context.Context, logger *logr.Logger, client actions.ActionsService, runnerScaleSetId int) (*actions.RunnerScaleSetSession, *actions.RunnerScaleSetMessage, error) { hostName, err := os.Hostname() if err != nil { hostName = uuid.New().String() logger.Info("could not get hostname, fail back to a random string.", "fallback", hostName) } var runnerScaleSetSession *actions.RunnerScaleSetSession var retryCount int for { runnerScaleSetSession, err = client.CreateMessageSession(ctx, runnerScaleSetId, hostName) if err == nil { break } clientSideError := &actions.HttpClientSideError{} if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict { logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.") return nil, nil, fmt.Errorf("create message session http request failed. %w", err) } retryCount++ if retryCount >= sessionCreationMaxRetryCount { return nil, nil, fmt.Errorf("create message session failed since it exceed %d retry limit. %w", sessionCreationMaxRetryCount, err) } logger.Info("unable to create message session. Will try again in 30 seconds", "error", err.Error()) if ok := ctx.Value(testIgnoreSleep); ok == nil { time.Sleep(getRandomDuration(30, 45)) } } statistics, _ := json.Marshal(runnerScaleSetSession.Statistics) logger.Info("current runner scale set statistics.", "statistics", string(statistics)) if runnerScaleSetSession.Statistics.TotalAvailableJobs > 0 || runnerScaleSetSession.Statistics.TotalAssignedJobs > 0 { acquirableJobs, err := client.GetAcquirableJobs(ctx, runnerScaleSetId) if err != nil { return nil, nil, fmt.Errorf("get acquirable jobs failed. %w", err) } acquirableJobsJson, err := json.Marshal(acquirableJobs.Jobs) if err != nil { return nil, nil, fmt.Errorf("marshal acquirable jobs failed. %w", err) } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: string(acquirableJobsJson), } return runnerScaleSetSession, initialMessage, nil } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: "", } return runnerScaleSetSession, initialMessage, nil }
createSession関数では、まずPodのホスト名を取得します。これはGitHubとのSession取得の際にOwnerとして使用します。
hostName, err := os.Hostname() if err != nil { hostName = uuid.New().String() logger.Info("could not get hostname, fail back to a random string.", "fallback", hostName) }
ここで無限ループを回して、最大リトライ回数までSession取得を試みます。
var runnerScaleSetSession *actions.RunnerScaleSetSession var retryCount int for { runnerScaleSetSession, err = client.CreateMessageSession(ctx, runnerScaleSetId, hostName) if err == nil { break } clientSideError := &actions.HttpClientSideError{} if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict { logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.") return nil, nil, fmt.Errorf("create message session http request failed. %w", err) } retryCount++ if retryCount >= sessionCreationMaxRetryCount { return nil, nil, fmt.Errorf("create message session failed since it exceed %d retry limit. %w", sessionCreationMaxRetryCount, err) } logger.Info("unable to create message session. Will try again in 30 seconds", "error", err.Error()) if ok := ctx.Value(testIgnoreSleep); ok == nil { time.Sleep(getRandomDuration(30, 45)) } }
Sessionを取得する関数を見てみましょう。
まず、scaleSetEndpointとrunnerScaleSetIdでリクエストを送るAPIパスを作成します。
scaleSetEndpointは _apis/runtime/runnerscalesets になります。
次に、CreateMessageSession関数呼び出し前に取得したPodのホスト名を、RunnerScaleSetSession構造体のOwnerフィールド入れて、json.Marshalで構造体からjsonに変換します。
createdSession変数をRunnerScaleSetSession構造体のポインタ変数として定義します。
Sessionを実際に取得するのはdoSessionRequest関数ですが、戻り値がerrorしかありません。ただ、createdSession変数をポインタしてアドレスを渡しているので、doSessionRequest関数内でRunnerScaleSetSession構造体が変更された場合も、CreateMessageSession関数内でも変更済の値が参照できます。
func (c *Client) CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*RunnerScaleSetSession, error) { path := fmt.Sprintf("/%s/%d/sessions", scaleSetEndpoint, runnerScaleSetId) newSession := &RunnerScaleSetSession{ OwnerName: owner, } requestData, err := json.Marshal(newSession) if err != nil { return nil, err } createdSession := &RunnerScaleSetSession{} err = c.doSessionRequest(ctx, http.MethodPost, path, bytes.NewBuffer(requestData), http.StatusOK, createdSession) return createdSession, err }
doSessionRequest関数では、c.NewActionsServiceRequest関数でPostメソッドのRequestを作成して、Do関数を使用しSessionにRequestを送信して、Responseとして受け取ります。
その後、正常にResponseが返れば、json.NewDecoder(resp.Body).Decode(responseUnmarshalTarget)関数
でRunnerScaleSetSession構造体のアドレスにResponseを入れます。
func (c *Client) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error { req, err := c.NewActionsServiceRequest(ctx, method, path, requestData) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode == expectedResponseStatusCode && responseUnmarshalTarget != nil { return json.NewDecoder(resp.Body).Decode(responseUnmarshalTarget) } if resp.StatusCode >= 400 && resp.StatusCode < 500 { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) body = trimByteOrderMark(body) if err != nil { return err } return fmt.Errorf("unexpected status code: %d - body: %s", resp.StatusCode, string(body)) }
ここまでで、Sessionが取得できました。
createSession関数の解説に戻ります。
ここからはMessageを取得します。
AutoScalingRunnerSetリソースデプロイ段階で、GitHub上にはRunner Scale Setが作成されています。
このRunner Scale Set上で、有効なGitHub Actions Job(runnerScaleSetSession.Statistics.TotalAvailableJobs)が1つ以上、またはアサインされているGitHub Actions Job(runnerScaleSetSession.Statistics.TotalAssignedJobs)が1つ以上の場合に、client.GetAcquirableJobs関数でJobのlistを取得します。そして、Jobのlistをjson化して、初回のMessage(MessageIdが0)を作成します。
if runnerScaleSetSession.Statistics.TotalAvailableJobs > 0 || runnerScaleSetSession.Statistics.TotalAssignedJobs > 0 { acquirableJobs, err := client.GetAcquirableJobs(ctx, runnerScaleSetId) if err != nil { return nil, nil, fmt.Errorf("get acquirable jobs failed. %w", err) } acquirableJobsJson, err := json.Marshal(acquirableJobs.Jobs) if err != nil { return nil, nil, fmt.Errorf("marshal acquirable jobs failed. %w", err) } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: string(acquirableJobsJson), } return runnerScaleSetSession, initialMessage, nil } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: "", } return runnerScaleSetSession, initialMessage, nil
client.GetAcquirableJobs関数は以下のようになっており、まずはscaleSetEndpointとrunnerScaleSetIdでリクエストを送るAPIパスを作成します。
scaleSetEndpointは _apis/runtime/runnerscalesets になります。
次に、c.NewActionsServiceRequest関数でGetメソッドのRequestを作成して、Do関数を使用しSessionにRequestを送信して、Responseとして受け取ります。
その後、正常にResponseが返れば、Jobのlistをreturnします。
func (c *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*AcquirableJobList, error) { path := fmt.Sprintf("/%s/%d/acquirablejobs", scaleSetEndpoint, runnerScaleSetId) req, err := c.NewActionsServiceRequest(ctx, http.MethodGet, path, nil) if err != nil { return nil, err } resp, err := c.Do(req) if err != nil { return nil, err } if resp.StatusCode == http.StatusNoContent { defer resp.Body.Close() return &AcquirableJobList{Count: 0, Jobs: []AcquirableJob{}}, nil } if resp.StatusCode != http.StatusOK { return nil, ParseActionsErrorFromResponse(resp) } var acquirableJobList *AcquirableJobList err = json.NewDecoder(resp.Body).Decode(&acquirableJobList) if err != nil { return nil, err } return acquirableJobList, nil }
なお、Jobのlistは以下の構造体で管理されます。
type AcquirableJobList struct { Count int `json:"count"` Jobs []AcquirableJob `json:"value"` } type AcquirableJob struct { AcquireJobUrl string `json:"acquireJobUrl"` MessageType string `json:"messageType"` RunnerRequestId int64 `json:"runnerRequestId"` RepositoryName string `json:"repositoryName"` OwnerName string `json:"ownerName"` JobWorkflowRef string `json:"jobWorkflowRef"` EventName string `json:"eventName"` RequestLabels []string `json:"requestLabels"` }
Sessionと初回Messageが取得できたら、NewAutoScalerClient関数にreturnされます。
Sessionと初回Messageを使用して、Listenerを作成します。
この時、SessionとGitHub用Clientを使用して、 newSessionClient関数でlistener.clientを作成します。
listener.lastMessageId = 0
listener.initialMessage = initialMessage
listener.client = newSessionClient(client, logger, session)
newSessionClient関数は引数をSessionRefreshingClient構造体に入れて、returnします。
なお、listener.clientはactions.SessionService interfaceになっているので、SessionRefreshingClient構造体も入れることができます。
また、SessionRefreshingClient構造体をレシーバとする関数も、interface越しに実行可能となります。
func newSessionClient(client actions.ActionsService, logger *logr.Logger, session *actions.RunnerScaleSetSession) *SessionRefreshingClient { return &SessionRefreshingClient{ client: client, session: session, logger: logger.WithName("refreshing_client"), } }
actions.SessionService interfaceは以下のメソッドが定義されています。
type SessionService interface { GetMessage(ctx context.Context, lastMessageId int64) (*RunnerScaleSetMessage, error) DeleteMessage(ctx context.Context, messageId int64) error AcquireJobs(ctx context.Context, requestIds []int64) ([]int64, error) io.Closer }
ここまでが、NewAutoScalerClient関数に関する一連の説明となります。
NewAutoScalerClient関数でListenerが取得できたら、次にEphemeralRunnerSetの.Spec.ReplicasやEphemeralRunnerの.Statusを更新するためのKubernetesのClientが同梱されたmanagerを作成します。
// Create kube manager and scale controller kubeManager, err := NewKubernetesManager(&logger) if err != nil { return fmt.Errorf("failed to create kubernetes manager: %w", err) }
KubernetesのClientは、rest.InClusterConfig関数でrest.Configを作成し、kubernetes.NewForConfig関数でClientsetとして構成します。
func NewKubernetesManager(logger *logr.Logger) (*AutoScalerKubernetesManager, error) { conf, err := rest.InClusterConfig() if err != nil { return nil, err } kubeClient, err := kubernetes.NewForConfig(conf) if err != nil { return nil, err } var manager = &AutoScalerKubernetesManager{ Clientset: kubeClient, logger: logger.WithName("KubernetesManager"), } return manager, nil }
ここまで作成した、ListenerとKubernetesのmanagerを基に、NewService関数でServiceとして作成します。
scaleSettings := &ScaleSettings{ Namespace: rc.EphemeralRunnerSetNamespace, ResourceName: rc.EphemeralRunnerSetName, MaxRunners: rc.MaxRunners, MinRunners: rc.MinRunners, } service, err := NewService(ctx, autoScalerClient, kubeManager, scaleSettings, opts.serviceOptions...) if err != nil { return fmt.Errorf("failed to create new service: %v", err) }
NewService関数では、Service構造体に、ListenerとKubernetesのmanagerなど各引数を入れてreturnします。
func NewService( ctx context.Context, rsClient RunnerScaleSetClient, manager KubernetesManager, settings *ScaleSettings, options ...func(*Service), ) (*Service, error) { s := &Service{ ctx: ctx, rsClient: rsClient, kubeManager: manager, settings: settings, currentRunnerCount: -1, // force patch on startup logger: logr.FromContextOrDiscard(ctx), } for _, option := range options { option(s) } if len(s.errs) > 0 { return nil, errors.Join(s.errs...) } return s, nil }
上記で作成したServiceをレシーバとして、service.Start関数を呼び出して、GitHubとのロングポーリングなどの処理を開始します。
// Start listening for messages if err = service.Start(); err != nil { return fmt.Errorf("failed to start message queue listener: %w", err) }
いよいよ、AutoScalingListener Podの主処理であるロングポーリングの開始です。
Start関数は非常にシンプルな実装となっています。
見ての通り、無限ループがあり、その中でcontextが閉じられたか、それ以外かを判定して、それ以外であれば、s.rsClient.GetRunnerScaleSetMessage関数を呼び出しています。
実はこれがロングポーリングを実装している箇所となります。無限ループで、s.rsClient.GetRunnerScaleSetMessage関数を呼び出すことで、メッセージの取得→メッセージの処理をずっと実行しています。
func (s *Service) Start() error { for { s.logger.Info("waiting for message...") select { case <-s.ctx.Done(): s.logger.Info("service is stopped.") return nil default: err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage) if err != nil { return fmt.Errorf("could not get and process message. %w", err) } } } }
EphemeralRunnerSetのスケール
ここからは、以下の動作を一気に見ていきます。
4.When a workflow run is triggered from a repository, the Actions Service dispatches individual job runs to the runners or runner scalesets where the runs-on property matches the name of the runner scaleset or labels of self-hosted runners.
5.When the Runner ScaleSet Listener receives the Job Available message, it checks whether it can scale up to the desired count. If it can, the Runner ScaleSet Listener acknowledges the message.
6.The Runner ScaleSet Listener uses a Service Account and a Role bound to that account to make an HTTPS call through the Kubernetes APIs to patch the EphemeralRunner Set resource with the number of desired replicas count.
Messageの受信からEphemeralRunnerSetのスケールまでを見ていきましょう。
AutoScalingListener PodはGitHubとのロングポーリングを通して、Messageを受信します。
この時、s.rsClient.GetRunnerScaleSetMessage関数でMessageを取得します。
s.rsClient.GetRunnerScaleSetMessage関数呼び出し時には、contextの他にs.processMessage関数定義も引数に指定してるので、Message受信後にEphemeralRunnerSetのスケールを含めたMessageの処理が可能となるわけです。
err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage)
GetRunnerScaleSetMessage関数では、初回のみMessage受信→Messageの処理の流れではなく、その逆のMessageの処理→Message受信を行っています。これは、初回のMessageはロングポーリング開始前に取得しているため、最初に処理する必要があるためです。その後は、Message受信→Messageの処理の流れで動いていきます。
以降でMessage受信(m.client.GetMessage関数)と、EphemeralRunnerSetリソースのスケールを含めたMessageの処理(handler引数(Service.processMessage関数))を見ていきます。
func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error) error { if m.initialMessage != nil { err := handler(m.initialMessage) if err != nil { return fmt.Errorf("fail to process initial message. %w", err) } m.initialMessage = nil return nil } for { message, err := m.client.GetMessage(ctx, m.lastMessageId) if err != nil { return fmt.Errorf("get message failed from refreshing client. %w", err) } if message == nil { continue } err = handler(message) if err != nil { return fmt.Errorf("handle message failed. %w", err) } m.lastMessageId = message.MessageId return m.deleteMessage(ctx, message.MessageId) } }
レシーバであるSessionRefreshingClient構造体は、actions.SessionService interfaceに格納されています。
そのため、ListenerのClientの型であるactions.SessionService interface越しにGetMessage関数が呼び出せます。
GetMessage関数は、m.client.GetMessage関数を更に呼び出し、最新のMessageを取得します。
この時、Sessionの情報が古い場合、m.client.RefreshMessageSession関数でSessionをRefreshして再度、Messageの取得を試みます。
なお、m.client.RefreshMessageSession関数では、ロングポーリング前にSessionを取得した際に呼び出した、doSessionRequest関数を再度呼び出し、Sessionを再取得します。
func (m *SessionRefreshingClient) GetMessage(ctx context.Context, lastMessageId int64) (*actions.RunnerScaleSetMessage, error) { message, err := m.client.GetMessage(ctx, m.session.MessageQueueUrl, m.session.MessageQueueAccessToken, lastMessageId) if err == nil { return message, nil } expiredError := &actions.MessageQueueTokenExpiredError{} if !errors.As(err, &expiredError) { return nil, fmt.Errorf("get message failed. %w", err) } m.logger.Info("message queue token is expired during GetNextMessage, refreshing...") session, err := m.client.RefreshMessageSession(ctx, m.session.RunnerScaleSet.Id, m.session.SessionId) if err != nil { return nil, fmt.Errorf("refresh message session failed. %w", err) } m.session = session message, err = m.client.GetMessage(ctx, m.session.MessageQueueUrl, m.session.MessageQueueAccessToken, lastMessageId) if err != nil { return nil, fmt.Errorf("delete message failed after refresh message session. %w", err) } return message, nil }
こちらのGetMessage関数では、URLをParseして、http.NewRequestWithContext関数でGetメソッドでRequestを作成、Do関数でRequestを送り、Reponseを取得します。
そしてReponseを json.NewDecoder(resp.Body).Decode(&message)関数
でMessageを取得し、returnします。
func (c *Client) GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*RunnerScaleSetMessage, error) { u, err := url.Parse(messageQueueUrl) if err != nil { return nil, err } if lastMessageId > 0 { q := u.Query() q.Set("lastMessageId", strconv.FormatInt(lastMessageId, 10)) u.RawQuery = q.Encode() } req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/json; api-version=6.0-preview") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", messageQueueAccessToken)) if c.userAgent != "" { req.Header.Set("User-Agent", c.userAgent) } resp, err := c.Do(req) if err != nil { return nil, err } : var message *RunnerScaleSetMessage err = json.NewDecoder(resp.Body).Decode(&message) if err != nil { return nil, err } return message, nil }
Messageが取得できたら、Messageを処理して、必要に応じてEphemeralRunnerSetをスケールさせます。
それをやっているのが、以下のprocessMessage関数です。
なお、EphemeralRunnerSetのスケールに関わる箇所のみ解説します。
func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error { : return s.scaleForAssignedJobCount(message.Statistics.TotalAssignedJobs) }
scaleForAssignedJobCount関数は以下の定義です。
まず、targetRunnerCount := int(math.Max(math.Min(float64(s.settings.MaxRunners), float64(count)), float64(s.settings.MinRunners)))
でスケールすべき値を求めます。
この時、Jobがない場合は、0またはs.settings.MinRunnersがtargetRunnerCount変数に入ります。こうすることでWorkflow完了後にスケールインさせています。
スケールすべき値と現在のRunnerの数を比較して、異なればログを出力して、s.kubeManager.ScaleEphemeralRunnerSet関数でスケールさせます。
スケール後、現在のRunnerの数(s.currentRunnerCoun)にスケールすべき値(targetRunnerCount)を入れます。
func (s *Service) scaleForAssignedJobCount(count int) error { targetRunnerCount := int(math.Max(math.Min(float64(s.settings.MaxRunners), float64(count)), float64(s.settings.MinRunners))) s.metricsExporter.publishDesiredRunners(targetRunnerCount) if targetRunnerCount != s.currentRunnerCount { s.logger.Info("try scale runner request up/down base on assigned job count", "assigned job", count, "decision", targetRunnerCount, "min", s.settings.MinRunners, "max", s.settings.MaxRunners, "currentRunnerCount", s.currentRunnerCount, ) err := s.kubeManager.ScaleEphemeralRunnerSet(s.ctx, s.settings.Namespace, s.settings.ResourceName, targetRunnerCount) if err != nil { return fmt.Errorf("could not scale ephemeral runner set (%s/%s). %w", s.settings.Namespace, s.settings.ResourceName, err) } s.currentRunnerCount = targetRunnerCount } return nil }
ScaleEphemeralRunnerSet関数でEphemeralRunnerSetをスケールさせます。
以降で解説します。
func (k *AutoScalerKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Context, namespace, resourceName string, runnerCount int) error { original := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: -1, }, } originalJson, err := json.Marshal(original) if err != nil { k.logger.Error(err, "could not marshal empty ephemeral runner set") } patch := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: runnerCount, }, } patchJson, err := json.Marshal(patch) if err != nil { k.logger.Error(err, "could not marshal patch ephemeral runner set") } mergePatch, err := jsonpatch.CreateMergePatch(originalJson, patchJson) if err != nil { k.logger.Error(err, "could not create merge patch json for ephemeral runner set") } k.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch)) patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{} err = k.RESTClient(). Patch(types.MergePatchType). Prefix("apis", "actions.github.com", "v1alpha1"). Namespace(namespace). Resource("EphemeralRunnerSets"). Name(resourceName). Body([]byte(mergePatch)). Do(ctx). Into(patchedEphemeralRunnerSet) if err != nil { return fmt.Errorf("could not patch ephemeral runner set , patch JSON: %s, error: %w", string(mergePatch), err) } k.logger.Info("Ephemeral runner set scaled.", "namespace", namespace, "name", resourceName, "replicas", patchedEphemeralRunnerSet.Spec.Replicas) return nil }
スケールはPatch関数で行います。
まず、パッチを当てる対象を作成して、json.Marshal関数で、jsonに変換します。
original := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: -1, }, } originalJson, err := json.Marshal(original) if err != nil { k.logger.Error(err, "could not marshal empty ephemeral runner set") }
パッチを当てる対象を作成したら、パッチを作成します。
作成が終わったら、パッチ対象とパッチを jsonpatch.CreateMergePatch関数でマージします。
patch := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: runnerCount, }, } patchJson, err := json.Marshal(patch) if err != nil { k.logger.Error(err, "could not marshal patch ephemeral runner set") } mergePatch, err := jsonpatch.CreateMergePatch(originalJson, patchJson) if err != nil { k.logger.Error(err, "could not create merge patch json for ephemeral runner set") }
パッチのマージが完了したら、Patch関数でパッチを当て、スケールさせます。
patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{} err = k.RESTClient(). Patch(types.MergePatchType). Prefix("apis", "actions.github.com", "v1alpha1"). Namespace(namespace). Resource("EphemeralRunnerSets"). Name(resourceName). Body([]byte(mergePatch)). Do(ctx). Into(patchedEphemeralRunnerSet) if err != nil { return fmt.Errorf("could not patch ephemeral runner set , patch JSON: %s, error: %w", string(mergePatch), err) } k.logger.Info("Ephemeral runner set scaled.", "namespace", namespace, "name", resourceName, "replicas", patchedEphemeralRunnerSet.Spec.Replicas) return nil
スケールができたら、いよいよActions Runner ControllerのSelf-hosted runnerであるEphemeralRunnerリソースおよびEphemeralRunner Podが払い出されます。
以降で払い出しから、Workflow終了後のスケールダウンまで見ていきます。
EphemeralRunnerSet ControllerでのEphemeralRunnerリソースの作成および削除
ここから以下の動作を見ていきます。
なお、一旦EphemeralRunnerリソースの作成をまずは解説します。
7.The EphemeralRunner Set attempts to create new runners and the EphemeralRunner Controller requests a JIT configuration token to register these runners. The controller attempts to create runner pods. If the pod's status is failed, the controller retries up to 5 times. After 24 hours the Actions Service unassigns the job if no runner accepts it.
EphemeralRunnerリソースはEphemeralRunnerSet Controllerが作成します。
まず、EphemeralRunnerリソースのlistを取得して、categorizeEphemeralRunners関数でStatusごとにEphemeralRunnerリソースをカテゴライズします。
なお、初期段階ではEphemeralRunnerリソースは作成されていないため、カテゴライズされません。
// Find all EphemeralRunner with matching namespace and own by this EphemeralRunnerSet. ephemeralRunnerList := new(v1alpha1.EphemeralRunnerList) err := r.List( ctx, ephemeralRunnerList, client.InNamespace(req.Namespace), client.MatchingFields{resourceOwnerKey: req.Name}, ) if err != nil { log.Error(err, "Unable to list child ephemeral runners") return ctrl.Result{}, err } pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList)
categorizeEphemeralRunners関数は、switch-case文で、StatusごとにSliceに振り分けていきます。
func categorizeEphemeralRunners(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) (pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners []*v1alpha1.EphemeralRunner) { for i := range ephemeralRunnerList.Items { r := &ephemeralRunnerList.Items[i] if !r.ObjectMeta.DeletionTimestamp.IsZero() { deletingEphemeralRunners = append(deletingEphemeralRunners, r) continue } switch r.Status.Phase { case corev1.PodRunning: runningEphemeralRunners = append(runningEphemeralRunners, r) case corev1.PodSucceeded: finishedEphemeralRunners = append(finishedEphemeralRunners, r) case corev1.PodFailed: failedEphemeralRunners = append(failedEphemeralRunners, r) default: // Pending or no phase should be considered as pending. // // If field is not set, that means that the EphemeralRunner // did not yet have chance to update the Status.Phase field. pendingEphemeralRunners = append(pendingEphemeralRunners, r) } } return }
Statusごとのカテゴリが済んだら、カテゴリごとにEphemeralRunnerリソースを捌いていきます。
Jobが完了したfinishedEphemeralRunnersカテゴリにあるEphemeralRunnerリソースはDelete関数で削除されます。
// cleanup finished runners and proceed var errs []error for i := range finishedEphemeralRunners { log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { if !kerrors.IsNotFound(err) { errs = append(errs, err) } } }
次に各カテゴリの合計と、EphemeralRunnerSetリソースの.Spec.Replicasを比較して、.Spec.Replicasの方が大きければ、差分の数だけEphemeralRunnerリソースの作成を行います。
total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) switch { case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up count := ephemeralRunnerSet.Spec.Replicas - total log.Info("Creating new ephemeral runners (scale up)", "count", count) if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { log.Error(err, "failed to make ephemeral runner") return ctrl.Result{}, err } :
まずは、EphemeralRunnerリソースの作成を見てみましょう。
作成は、createEphemeralRunners関数で行います。
EphemeralRunnerSetリソースの.Spec.Replicasから、現在のEphemeralRunnerリソースの合計を引いた差分だけEphemeralRunnerリソースを作成します。つまり、EphemeralRunnerSetリソースの.Spec.Replicasまでスケールさせます。
r.resourceBuilder.newEphemeralRunner関数で定義を作成しているので、以降で解説します。
定義作成後は、EphemeralRunnerSetリソースを親としてOwnerReferenceを設定し、r.Create関数でリソースを作成するだけです。
func (r *EphemeralRunnerSetReconciler) createEphemeralRunners(ctx context.Context, runnerSet *v1alpha1.EphemeralRunnerSet, count int, log logr.Logger) error { // Track multiple errors at once and return the bundle. errs := make([]error, 0) for i := 0; i < count; i++ { ephemeralRunner := r.resourceBuilder.newEphemeralRunner(runnerSet) if runnerSet.Spec.EphemeralRunnerSpec.Proxy != nil { ephemeralRunner.Spec.ProxySecretRef = proxyEphemeralRunnerSetSecretName(runnerSet) } // Make sure that we own the resource we create. if err := ctrl.SetControllerReference(runnerSet, ephemeralRunner, r.Scheme); err != nil { log.Error(err, "failed to set controller reference on ephemeral runner") errs = append(errs, err) continue } log.Info("Creating new ephemeral runner", "progress", i+1, "total", count) if err := r.Create(ctx, ephemeralRunner); err != nil { log.Error(err, "failed to make ephemeral runner") errs = append(errs, err) continue } log.Info("Created new ephemeral runner", "runner", ephemeralRunner.Name) } return multierr.Combine(errs...) }
newEphemeralRunner関数では、EphemeralRunnerリソースの定義を作成します。
その際、.SpecにはEphemeralRunnerSetリソースの.Spec.EphemeralRunnerSpecフィールドを入れます。
func (b *resourceBuilder) newEphemeralRunner(ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet) *v1alpha1.EphemeralRunner { labels := make(map[string]string) for _, key := range commonLabelKeys { switch key { case LabelKeyKubernetesComponent: labels[key] = "runner" default: v, ok := ephemeralRunnerSet.Labels[key] if !ok { continue } labels[key] = v } } annotations := make(map[string]string) for key, val := range ephemeralRunnerSet.Annotations { annotations[key] = val } return &v1alpha1.EphemeralRunner{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ GenerateName: ephemeralRunnerSet.Name + "-runner-", Namespace: ephemeralRunnerSet.Namespace, Labels: labels, Annotations: annotations, }, Spec: ephemeralRunnerSet.Spec.EphemeralRunnerSpec, } }
次に以下の動作の続きの部分、EphemeralRunner Podの作成を解説します。
このPodがSelft-hosted runnerとして動作します。
7.The EphemeralRunner Set attempts to create new runners and the EphemeralRunner Controller requests a JIT configuration token to register these runners. The controller attempts to create runner pods. If the pod's status is failed, the controller retries up to 5 times. After 24 hours the Actions Service unassigns the job if no runner accepts it.
EphemeralRunner PodはEphemeralRunner Controllerによって作成されます。
この時、Just-In-Time(以降JIT) Tokenを作成して、GitHub上にJobごとにRunnerを作成します。
まず、r.updateStatusWithRunnerConfig関数でJIT Configを取得します。
if ephemeralRunner.Status.RunnerId == 0 { log.Info("Creating new ephemeral runner registration and updating status with runner config") return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) }
r.updateStatusWithRunnerConfig関数では、actionsClient.GenerateJitRunnerConfig関数でJIT Configを作成して、EphemeralRunnerの.Status.RunnerJITConfigに設定します。
func (r *EphemeralRunnerReconciler) updateStatusWithRunnerConfig(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) (ctrl.Result, error) { // Runner is not registered with the service. We need to register it first log.Info("Creating ephemeral runner JIT config") actionsClient, err := r.actionsClientFor(ctx, ephemeralRunner) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to get actions client for generating JIT config: %v", err) } jitSettings := &actions.RunnerScaleSetJitRunnerSetting{ Name: ephemeralRunner.Name, } jitConfig, err := actionsClient.GenerateJitRunnerConfig(ctx, jitSettings, ephemeralRunner.Spec.RunnerScaleSetId) : log.Info("Created ephemeral runner JIT config", "runnerId", jitConfig.Runner.Id) log.Info("Updating ephemeral runner status with runnerId and runnerJITConfig") err = patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { obj.Status.RunnerId = jitConfig.Runner.Id obj.Status.RunnerName = jitConfig.Runner.Name obj.Status.RunnerJITConfig = jitConfig.EncodedJITConfig }) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to update runner status for RunnerId/RunnerName/RunnerJITConfig: %v", err) } log.Info("Updated ephemeral runner status with runnerId and runnerJITConfig") return ctrl.Result{}, nil }
GenerateJitRunnerConfig関数では、scaleSetEndpointとscaleSetIdを使用し、APIのパスをまず作成します。
そのパスにPOSTメソッドでrequestを送り、json.NewDecoder(resp.Body).Decode(&runnerJitConfig)
でresponseからJIT Configを取得します。
func (c *Client) GenerateJitRunnerConfig(ctx context.Context, jitRunnerSetting *RunnerScaleSetJitRunnerSetting, scaleSetId int) (*RunnerScaleSetJitRunnerConfig, error) { path := fmt.Sprintf("/%s/%d/generatejitconfig", scaleSetEndpoint, scaleSetId) body, err := json.Marshal(jitRunnerSetting) if err != nil { return nil, err } req, err := c.NewActionsServiceRequest(ctx, http.MethodPost, path, bytes.NewBuffer(body)) if err != nil { return nil, err } resp, err := c.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { return nil, ParseActionsErrorFromResponse(resp) } var runnerJitConfig *RunnerScaleSetJitRunnerConfig err = json.NewDecoder(resp.Body).Decode(&runnerJitConfig) if err != nil { return nil, err } return runnerJitConfig, nil }
ここまで、JIT Configの取得方法を解説してきましたが、JITに関しては以下の記事にすごく詳しく書いているので、併せてご確認ください。
なお、Runner Scale SetのJIT Configは以下の形式となっております。
{ ".runner": { "AgentId": "322", "AgentName": "arc-runner-set-v7nzm-runner-fmfcz", "DisableUpdate": "True", "Ephemeral": "True", "PoolId": "1", "PoolName": null, "ServerUrl": "https://pipelines.actions.githubusercontent.com/abKlCigKduPqALsnnIvHC16pYwPgN97mNIjD0UdG00l4ne31uP/", "RunnerScaleSetId": "12", "RunnerScaleSetName": "arc-runner-set", "WorkFolder": "_work" }, ".credentials": {xxxxxxxxxxxxxxxxxxxxxxx}, ".credentials_rsaparams": {xxxxxxxxxxxxxxxxxxxxxxx} }
JIT Configが取得できたら、これをSecretリソース化します。
EphemeralRunnerリソースと同じ名前のSecretリソースを取得します。
この時、存在しなければr.createSecret関数で作成します。
secret := new(corev1.Secret) if err := r.Get(ctx, req.NamespacedName, secret); err != nil { if !kerrors.IsNotFound(err) { log.Error(err, "Failed to fetch secret") return ctrl.Result{}, err } // create secret if not created log.Info("Creating new ephemeral runner secret for jitconfig.") return r.createSecret(ctx, ephemeralRunner, log) }
createSecret関数では、r.resourceBuilder.newEphemeralRunnerJitSecret関数で定義を作成し、EphemeralRunnerリソースを親としてOwnerReferenceを設定し、r.Create関数でSecretを作成します。
func (r *EphemeralRunnerReconciler) createSecret(ctx context.Context, runner *v1alpha1.EphemeralRunner, log logr.Logger) (ctrl.Result, error) { log.Info("Creating new secret for ephemeral runner") jitSecret := r.resourceBuilder.newEphemeralRunnerJitSecret(runner) if err := ctrl.SetControllerReference(runner, jitSecret, r.Scheme); err != nil { return ctrl.Result{}, fmt.Errorf("failed to set controller reference: %v", err) } log.Info("Created new secret spec for ephemeral runner") if err := r.Create(ctx, jitSecret); err != nil { return ctrl.Result{}, fmt.Errorf("failed to create jit secret: %v", err) } log.Info("Created ephemeral runner secret", "secretName", jitSecret.Name) return ctrl.Result{Requeue: true}, nil }
newEphemeralRunnerJitSecret関数では、先ほど作成したJIT Configを.Data.jitTokenKeyのValueとしてSecret化します。
func (b *resourceBuilder) newEphemeralRunnerJitSecret(ephemeralRunner *v1alpha1.EphemeralRunner) *corev1.Secret { return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace, }, Data: map[string][]byte{ jitTokenKey: []byte(ephemeralRunner.Status.RunnerJITConfig), }, } }
次に、Selft-hosted runnerであるEphemeralRunner Podを作成します。
まず、Podを取得します。もし存在しないかつ、今までPodで5回以上Podで処理に失敗している場合は、r.markAsFailed関数でEphemeralRunnerリソースの.Status.PhaseをFailedに更新して、GitHub上のRunnerを削除します。
また、それ以外の場合にEphemeralRunner Podを作成します。
pod := new(corev1.Pod) if err := r.Get(ctx, req.NamespacedName, pod); err != nil { switch { case !kerrors.IsNotFound(err): log.Error(err, "Failed to fetch the pod") return ctrl.Result{}, err case len(ephemeralRunner.Status.Failures) > 5: log.Info("EphemeralRunner has failed more than 5 times. Marking it as failed") if err := r.markAsFailed(ctx, ephemeralRunner, log); err != nil { log.Error(err, "Failed to set ephemeral runner to phase Failed") return ctrl.Result{}, err } return ctrl.Result{}, nil default: // Pod was not found. Create if the pod has never been created log.Info("Creating new EphemeralRunner pod.") return r.createPod(ctx, ephemeralRunner, secret, log) } }
markAsFailed関数では、EphemeralRunnerリソースのStatusをFailedに更新します。
そして、r.deleteRunnerFromService関数でGitHub上のRunnerを削除します。
func (r *EphemeralRunnerReconciler) markAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) error { log.Info("Updating ephemeral runner status to Failed") if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { obj.Status.Phase = corev1.PodFailed obj.Status.Reason = "TooManyPodFailures" obj.Status.Message = "Pod has failed to start more than 5 times" }); err != nil { return fmt.Errorf("failed to update ephemeral runner status Phase/Message: %v", err) } log.Info("Removing the runner from the service") if err := r.deleteRunnerFromService(ctx, ephemeralRunner, log); err != nil { return fmt.Errorf("failed to remove the runner from service: %v", err) } log.Info("EphemeralRunner is marked as Failed and deleted from the service") return nil }
deleteRunnerFromService関数では、RemoveRunner関数を呼び出します。
func (r *EphemeralRunnerReconciler) deleteRunnerFromService(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) error { client, err := r.actionsClientFor(ctx, ephemeralRunner) if err != nil { return fmt.Errorf("failed to get actions client for runner: %v", err) } log.Info("Removing runner from the service", "runnerId", ephemeralRunner.Status.RunnerId) err = client.RemoveRunner(ctx, int64(ephemeralRunner.Status.RunnerId)) if err != nil { return fmt.Errorf("failed to remove runner from the service: %w", err) } log.Info("Removed runner from the service", "runnerId", ephemeralRunner.Status.RunnerId) return nil }
RemoveRunner関数では、runnerEndpointとrunnerIdでAPIのパスを作成し、DeleteメソッドでRequestを作成、Do関数でDeleteメソッドのRequestを送り、Runnerを削除します
func (c *Client) RemoveRunner(ctx context.Context, runnerId int64) error { path := fmt.Sprintf("/%s/%d", runnerEndpoint, runnerId) req, err := c.NewActionsServiceRequest(ctx, http.MethodDelete, path, nil) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusNoContent { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() return nil }
次に、EphemeralRunner Podを作成する関数を見てみましょう。
Proxy設定箇所は割愛しています。
r.resourceBuilder.newEphemeralRunnerPod関数で定義を作成し、EphemeralRunnerリソースを親としてOwnerReferenceを設定後、r.Create関数でPodを作成します。
func (r *EphemeralRunnerReconciler) createPod(ctx context.Context, runner *v1alpha1.EphemeralRunner, secret *corev1.Secret, log logr.Logger) (ctrl.Result, error) { : log.Info("Creating new pod for ephemeral runner") newPod := r.resourceBuilder.newEphemeralRunnerPod(ctx, runner, secret, envs...) if err := ctrl.SetControllerReference(runner, newPod, r.Scheme); err != nil { log.Error(err, "Failed to set controller reference to a new pod") return ctrl.Result{}, err } log.Info("Created new pod spec for ephemeral runner") if err := r.Create(ctx, newPod); err != nil { log.Error(err, "Failed to create pod resource for ephemeral runner.") return ctrl.Result{}, err } log.Info("Created ephemeral runner pod", "runnerScaleSetId", runner.Spec.RunnerScaleSetId, "runnerName", runner.Status.RunnerName, "runnerId", runner.Status.RunnerId, "configUrl", runner.Spec.GitHubConfigUrl, "podName", newPod.Name) return ctrl.Result{}, nil }
newEphemeralRunnerPodでPod定義を作成します。
重要な箇所のみ抜粋しています。JIT ConfigのSecretで環境変数を設定しているのが分かるかと思います。
また、コンテナImageはautoscalingRunnerSetから継承されてきたghcr.io/actions/actions-runnerが使われます。
func (b *resourceBuilder) newEphemeralRunnerPod(ctx context.Context, runner *v1alpha1.EphemeralRunner, secret *corev1.Secret, envs ...corev1.EnvVar) *corev1.Pod { var newPod corev1.Pod : newPod.ObjectMeta = objectMeta newPod.Spec = runner.Spec.PodTemplateSpec.Spec newPod.Spec.Containers = make([]corev1.Container, 0, len(runner.Spec.PodTemplateSpec.Spec.Containers)) for _, c := range runner.Spec.PodTemplateSpec.Spec.Containers { if c.Name == EphemeralRunnerContainerName { c.Env = append( c.Env, corev1.EnvVar{ Name: EnvVarRunnerJITConfig, ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: secret.Name, }, Key: jitTokenKey, }, }, }, corev1.EnvVar{ Name: EnvVarRunnerExtraUserAgent, Value: fmt.Sprintf("actions-runner-controller/%s", build.Version), }, ) c.Env = append(c.Env, envs...) } newPod.Spec.Containers = append(newPod.Spec.Containers, c) } return &newPod }
EphemeralRunner PodはJIT Configを使用し、コンテナ内で自身をGitHubにRunnerとして登録して、Jobが開始されます。
これが以下の動作に当てはまります。
8.Once the runner pod is created, the runner application in the pod uses the JIT configuration token to register itself with the Actions Service. It then establishes another HTTPS long poll connection to receive the job details it needs to execute.
9.The Actions Service acknowledges the runner registration and dispatches the job run details.
次にJobの実行ステータスから処理を見ていきましょう、
10.Throughout the job run execution, the runner continuously communicates the logs and job run status back to the Actions Service.
以降でPodのStatusを捌いていきます。
まず、Statusがない=Pod起動直後の場合は、何もせずにreturnします。
Podが失敗かつEvicteされた場合は、r.deletePodAsFailed関数でPodを削除し、EphemeralRunnerリソースの.Status.Failuresをtrueで更新します。
Podは削除されてもEphemeralRunner Contollerによって自動再作成されます。なお、失敗は5回まで許可されます。
また、r.updateRunStatusFromPod関数で、Podの.Status.Phaseや.Status.Reason、.Status.Messageの値で、EphemeralRunnerリソースの同フィールドを更新します。
cs := runnerContainerStatus(pod) switch { case cs == nil: // starting, no container state yet log.Info("Waiting for runner container status to be available") return ctrl.Result{}, nil case cs.State.Terminated == nil: // still running or evicted if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted" { log.Info("Pod set the termination phase, but container state is not terminated. Deleting pod", "PodPhase", pod.Status.Phase, "PodReason", pod.Status.Reason, "PodMessage", pod.Status.Message, ) if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "failed to delete pod as failed on pod.Status.Phase: Failed") return ctrl.Result{}, err } return ctrl.Result{}, nil } log.Info("Ephemeral runner container is still running") if err := r.updateRunStatusFromPod(ctx, ephemeralRunner, pod, log); err != nil { log.Info("Failed to update ephemeral runner status. Requeue to not miss this event") return ctrl.Result{}, err } return ctrl.Result{}, nil
Podが終了した際の終了コードが0でない場合、r.deletePodAsFailed関数でPodを削除し、EphemeralRunnerリソースの.Status.Failuresをtrueで更新します。
Podは削除されてもEphemeralRunner Contollerが自動再作成されます。なお、失敗は5回まで許可されます。
case cs.State.Terminated.ExitCode != 0: // failed log.Info("Ephemeral runner container failed", "exitCode", cs.State.Terminated.ExitCode) if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "Failed to delete runner pod on failure") return ctrl.Result{}, err } return ctrl.Result{}, nil
ここではデフォルトの終了動作を行います。
終了コードが0で終了した場合、通常はGitHub上にRunnerも一緒に削除されます。
しかし、なんらかの要因で削除されずに、残っている場合があります。
そのため、まずr.runnerRegisteredWithService関数で、GitHub上にRunnerが存在するか確認します。
存在しない場合は正常終了しているので、corev1.PodSucceededでEphemeralRunnerリソースの.Status.Phaseを更新します。
存在する場合は、一旦Podを再作成して、Runnerを再アサインします。
default: // pod succeeded. We double-check with the service if the runner exists. // The reason is that image can potentially finish with status 0, but not pick up the job. existsInService, err := r.runnerRegisteredWithService(ctx, ephemeralRunner.DeepCopy(), log) if err != nil { log.Error(err, "Failed to check if runner is registered with the service") return ctrl.Result{}, err } if !existsInService { // the runner does not exist in the service, so it must be done log.Info("Ephemeral runner has finished since it does not exist in the service anymore") if err := r.markAsFinished(ctx, ephemeralRunner, log); err != nil { log.Error(err, "Failed to mark ephemeral runner as finished") return ctrl.Result{}, err } return ctrl.Result{}, nil } // The runner still exists. This can happen if the pod exited with 0 but fails to start log.Info("Ephemeral runner pod has finished, but the runner still exists in the service. Deleting the pod to restart it.") if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "failed to delete a pod that still exists in the service") return ctrl.Result{}, err } return ctrl.Result{}, nil }
WorkflowのJob終了
最後に、WorkflowのJob終了時の動作を見ていきましょう。
11.When the runner completes its job successfully, the EphemeralRunner Controller checks with the Actions Service to see if runner can be deleted. If it can, the Ephemeral RunnerSet deletes the runner.
Jobが終了すると以下の動作で、GitHub上のRunnerとEphemeralRunnerリソースを削除します。
なお、EphemeralRunnerリソースの子として、EphemeralRunner Podが設定されているので、EphemeralRunnerリソース削除前にEphemeralRunner Podが削除されます。
以下のように、各カテゴリの合計の方が大きければ、差分のEphemeralRunnerリソースの削除とGitHub上のRunnerも削除します。こちらは、AutoScalingRunnerSetのminRunnersが設定されている場合の動作となります。
total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) switch { : case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. count := total - ephemeralRunnerSet.Spec.Replicas log.Info("Deleting ephemeral runners (scale down)", "count", count) if err := r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners, count, log); err != nil { log.Error(err, "failed to delete idle runners") return ctrl.Result{}, err } }
EphemeralRunnerリソースと、GitHub上のRunnerの削除を行うr.deleteIdleEphemeralRunners関数を見てみましょう。
現在のEphemeralRunnerリソースの合計から、EphemeralRunnerSetリソースの.Spec.Replicasを引いた差分だけ削除します
for文でPendingとRunningのEphemeralRunnerリソースを読み出し、差分がなくなるまで削除していきます。
その際、実際に削除するのはr.deleteEphemeralRunnerWithActionsClient関数です。
func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners []*v1alpha1.EphemeralRunner, count int, log logr.Logger) error { runners := newEphemeralRunnerStepper(pendingEphemeralRunners, runningEphemeralRunners) if runners.len() == 0 { log.Info("No pending or running ephemeral runners running at this time for scale down") return nil } actionsClient, err := r.actionsClientFor(ctx, ephemeralRunnerSet) if err != nil { return fmt.Errorf("failed to create actions client for ephemeral runner replica set: %v", err) } var errs []error deletedCount := 0 for runners.next() { ephemeralRunner := runners.object() if ephemeralRunner.Status.RunnerId == 0 { log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) continue } if ephemeralRunner.Status.JobRequestId > 0 { log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) continue } log.Info("Removing the idle ephemeral runner", "name", ephemeralRunner.Name) ok, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) if err != nil { errs = append(errs, err) } if !ok { continue } deletedCount++ if deletedCount == count { break } } return multierr.Combine(errs...) }
deleteEphemeralRunnerWithActionsClient関数は、actionsClient.RemoveRunner関数でGitHub上のRunnerを削除し、r.Delete関数でEphemeralRunnerリソースを削除します。
func (r *EphemeralRunnerSetReconciler) deleteEphemeralRunnerWithActionsClient(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, actionsClient actions.ActionsService, log logr.Logger) (bool, error) { if err := actionsClient.RemoveRunner(ctx, int64(ephemeralRunner.Status.RunnerId)); err != nil { actionsError := &actions.ActionsError{} if errors.As(err, &actionsError) && actionsError.StatusCode == http.StatusBadRequest && strings.Contains(actionsError.ExceptionName, "JobStillRunningException") { // Runner is still running a job, proceed with the next one return false, nil } return false, err } log.Info("Deleting ephemeral runner after removing from the service", "name", ephemeralRunner.Name, "runnerId", ephemeralRunner.Status.RunnerId) if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { return false, err } log.Info("Deleted ephemeral runner", "name", ephemeralRunner.Name, "runnerId", ephemeralRunner.Status.RunnerId) return true, nil }
RemoveRunner関数は、runnerEndpointとrunnerIdを使用し、APIのパスを作成し、Deleteメソッドでrequestを作成後に、Do関数でRunnerを削除します。
func (c *Client) RemoveRunner(ctx context.Context, runnerId int64) error { path := fmt.Sprintf("/%s/%d", runnerEndpoint, runnerId) req, err := c.NewActionsServiceRequest(ctx, http.MethodDelete, path, nil) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusNoContent { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() return nil }
以上で、前編後編に分けて解説したARCのコードの解説は終了です!!!
さいごに
いやあ、ARCのコードってめちゃくちゃ複雑ですよね!
読んでて、何度も迷子になりましたが、完走できてよかったです。
今後はコントリビュートも視野に入れて、活動していきたいと思います!
是非皆さんも気合をいれて読んでみてください。
今回のコード解説が皆様の日頃のGitHub Actionsライフに少しでも役に立てたら嬉しいです!
次回はARCのメトリクス監視を見ていきたいと思いますので、お楽しみに!
ACS事業部のご紹介
私達ACS事業部はAzure・AKSなどのクラウドネイティブ技術を活用した内製化やGitHub Enterpriseの導入のご支援をしております。
www.ap-com.co.jp
www.ap-com.co.jp
また、一緒に働いていただける仲間も募集中です!
今年もまだまだ組織規模拡大中なので、ご興味持っていただけましたらぜひお声がけください。
www.ap-com.co.jp