こんにちは!ACS事業部の谷合です。 皆大好きGitHub Actionsにおける、GitHub社公式のSelf-hosted runnerであるActions Runner Controller(以降ARC)の紹介をシリーズでお送りしております。
- Actions Runner Controller Deep Dive!- アーキテクチャ編 - - APC 技術ブログ
- Actions Runner Controller Deep Dive!- 動作解説編 - - APC 技術ブログ
- Actions Runner Controller Deep Dive!- コード解説 前編 - - APC 技術ブログ
前回に引き続き、Actions Runner Controllerのコード解説をしていきます。
前回までは、AutoScalingRunnerSet、EphemeralRunnerSet、AutoScalingListenerリソースおよびAutoScalingListener Podの作成までを解説しました。 今回はAutoScalingListener PodでのGitHubとのロングポーリングなどの詳細動作や、Workflow実行時のEphemeralRunnerリソースやEphemeralRunner Pod作成動作を追っていきます。
- 解説すること
- ARCの起動からCI/CD Workflowの開始、終了までの動作
- 解説しないこと
- Proxy, TLSなどを使った接続設定
- metricsの設定
- コードリーディング時のcommit
- f1d7c52253b89f0beae60141f8465d9495cdc2cf
前回までで、AutoScalingRunnerSet、EphemeralRunnerSet、AutoScalingListenerリソースおよびAutoScalingListener Podの作成までできました。
今回は、AutoScalingListener PodでのGitHubとのロングポーリング確立までの解説から入っていきます。
AutoScalingListener Podの解説
まずは、前回の続きである以下の動作を解説します。なお、AutoScalingListener Podの作成は終わっているので、ロングポーリングを最初に解説していきます。
3.A Runner ScaleSet Listener pod is deployed by the AutoScaling Listener Controller. In this pod, the listener application connects to the Actions Service to authenticate and establish a long poll HTTPS connection. The listener stays idle until it receives a Job Available message from the Actions Service.
AutoScalingListener Podは、cmd/githubrunnerscalesetlistener の処理を実行します。
この時、環境変数は大文字でも構いません。環境変数は、AutoScalingListener Pod作成時に設定した際の、この箇所や、TLS設定のものとなります。
type RunnerScaleSetListenerConfig struct { ConfigureUrl string `split_words:"true"` AppID int64 `split_words:"true"` AppInstallationID int64 `split_words:"true"` AppPrivateKey string `split_words:"true"` Token string `split_words:"true"` EphemeralRunnerSetNamespace string `split_words:"true"` EphemeralRunnerSetName string `split_words:"true"` MaxRunners int `split_words:"true"` MinRunners int `split_words:"true"` RunnerScaleSetId int `split_words:"true"` RunnerScaleSetName string `split_words:"true"` ServerRootCA string `split_words:"true"` LogLevel string `split_words:"true"` LogFormat string `split_words:"true"` MetricsAddr string `split_words:"true"` MetricsEndpoint string `split_words:"true"` } func main() { var rc RunnerScaleSetListenerConfig if err := envconfig.Process("github", &rc); err != nil { fmt.Fprintf(os.Stderr, "Error: processing environment variables for RunnerScaleSetListenerConfig: %v\n", err) os.Exit(1) }
次に、SIGINT, SIGTERMがPod内で発生した場合にcontextをcancelさせるため、以下のsignal.NotifyContext関数でcontextを作成します。
AutoScalingListener PodはこのGroupを使用し、後続処理を非同期で並行処理しています。
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
g, ctx := errgroup.WithContext(ctx)
前述した並行処理とは、以下の箇所を指しており、ロングポーリングやEphemeralRunnerSetの更新などのAutoScalingListener Podの主処理や、Metrics Serverの起動を非同期に行っています。なお、今回はMetrics関連の解説はスコープ外としているので、解説は割愛します。
return run(ctx, rc, logger, opts)
の箇所でAutoScalingListener Podの主処理を起動します。
g.Go(func() error { opts := runOptions{ serviceOptions: []func(*Service){ WithLogger(logger), }, } opts.serviceOptions = append(opts.serviceOptions, WithPrometheusMetrics(rc)) return run(ctx, rc, logger, opts) }) if len(rc.MetricsAddr) != 0 { g.Go(func() error { metricsServer := metricsServer{ rc: rc, logger: logger, } g.Go(func() error { <-ctx.Done() return metricsServer.shutdown() }) return metricsServer.listenAndServe() }) }
func run(ctx context.Context, rc RunnerScaleSetListenerConfig, logger logr.Logger, opts runOptions) error { // Create root context and hook with sigint and sigterm creds := &actions.ActionsAuth{} if rc.Token != "" { creds.Token = rc.Token } else { creds.AppCreds = &actions.GitHubAppAuth{ AppID: rc.AppID, AppInstallationID: rc.AppInstallationID, AppPrivateKey: rc.AppPrivateKey, } } actionsServiceClient, err := newActionsClientFromConfig( rc, creds, actions.WithLogger(logger), actions.WithUserAgent(fmt.Sprintf("actions-runner-controller/%s", build.Version)), ) if err != nil { return fmt.Errorf("failed to create an Actions Service client: %w", err) } // Create message listener autoScalerClient, err := NewAutoScalerClient(ctx, actionsServiceClient, &logger, rc.RunnerScaleSetId) if err != nil { return fmt.Errorf("failed to create a message listener: %w", err) } defer autoScalerClient.Close() // Create kube manager and scale controller kubeManager, err := NewKubernetesManager(&logger) if err != nil { return fmt.Errorf("failed to create kubernetes manager: %w", err) } scaleSettings := &ScaleSettings{ Namespace: rc.EphemeralRunnerSetNamespace, ResourceName: rc.EphemeralRunnerSetName, MaxRunners: rc.MaxRunners, MinRunners: rc.MinRunners, } service, err := NewService(ctx, autoScalerClient, kubeManager, scaleSettings, opts.serviceOptions...) if err != nil { return fmt.Errorf("failed to create new service: %v", err) } // Start listening for messages if err = service.Start(); err != nil { return fmt.Errorf("failed to start message queue listener: %w", err) } return nil }
main関数内で、環境変数からToken(PAT)またはGitHub Appを構造体に入れていました。 構造体からCredentialとして、Token(PAT)またはGitHub Appを更にactions.ActionsAuth構造体に入れます。
creds := &actions.ActionsAuth{} if rc.Token != "" { creds.Token = rc.Token } else { creds.AppCreds = &actions.GitHubAppAuth{ AppID: rc.AppID, AppInstallationID: rc.AppInstallationID, AppPrivateKey: rc.AppPrivateKey, } }
actionsServiceClient, err := newActionsClientFromConfig( rc, creds, actions.WithLogger(logger), actions.WithUserAgent(fmt.Sprintf("actions-runner-controller/%s", build.Version)), ) if err != nil { return fmt.Errorf("failed to create an Actions Service client: %w", err) }
func newActionsClientFromConfig(config RunnerScaleSetListenerConfig, creds *actions.ActionsAuth, options ...actions.ClientOption) (*actions.Client, error) { : return actions.NewClient(config.ConfigureUrl, creds, options...) }
この関数で、CredentialやGitHubのURL(Enterprise or Organization or Repository)などを使用し、GitHub用のClientを作成します。
returnする直前に、ac.Client = retryClient.StandardClient()
func NewClient(githubConfigURL string, creds *ActionsAuth, options ...ClientOption) (*Client, error) { config, err := ParseGitHubConfigFromURL(githubConfigURL) if err != nil { return nil, fmt.Errorf("failed to parse githubConfigURL: %w", err) } ac := &Client{ creds: creds, config: config, logger: logr.Discard(), // retryablehttp defaults retryMax: 4, retryWaitMax: 30 * time.Second, userAgent: UserAgentInfo{ Version: build.Version, CommitSHA: build.CommitSHA, ScaleSetID: 0, }, } for _, option := range options { option(ac) } retryClient := retryablehttp.NewClient() retryClient.Logger = log.New(io.Discard, "", log.LstdFlags) retryClient.RetryMax = ac.retryMax retryClient.RetryWaitMax = ac.retryWaitMax transport, ok := retryClient.HTTPClient.Transport.(*http.Transport) if !ok { // this should always be true, because retryablehttp.NewClient() uses // cleanhttp.DefaultPooledTransport() return nil, fmt.Errorf("failed to get http transport from retryablehttp client") } : retryClient.HTTPClient.Transport = transport ac.Client = retryClient.StandardClient() return ac, nil }
// Create message listener autoScalerClient, err := NewAutoScalerClient(ctx, actionsServiceClient, &logger, rc.RunnerScaleSetId) if err != nil { return fmt.Errorf("failed to create a message listener: %w", err) } defer autoScalerClient.Close()
この時、ARCインストール前にGitHub Actionsが動いているかもしれません。その場合Message内のBodyにそのJobのlistが、jsonとして入ってきます。
func NewAutoScalerClient( ctx context.Context, client actions.ActionsService, logger *logr.Logger, runnerScaleSetId int, options ...func(*AutoScalerClient), ) (*AutoScalerClient, error) { listener := AutoScalerClient{ logger: logger.WithName("auto_scaler"), } session, initialMessage, err := createSession(ctx, &listener.logger, client, runnerScaleSetId) if err != nil { return nil, fmt.Errorf("fail to create session. %w", err) } listener.lastMessageId = 0 listener.initialMessage = initialMessage listener.client = newSessionClient(client, logger, session) for _, option := range options { option(&listener) } return &listener, nil }
type RunnerScaleSetSession struct { SessionId *uuid.UUID `json:"sessionId,omitempty"` OwnerName string `json:"ownerName,omitempty"` RunnerScaleSet *RunnerScaleSet `json:"runnerScaleSet,omitempty"` MessageQueueUrl string `json:"messageQueueUrl,omitempty"` MessageQueueAccessToken string `json:"messageQueueAccessToken,omitempty"` Statistics *RunnerScaleSetStatistic `json:"statistics,omitempty"` }
type RunnerScaleSetMessage struct { MessageId int64 `json:"messageId"` MessageType string `json:"messageType"` Body string `json:"body"` Statistics *RunnerScaleSetStatistic `json:"statistics"` }
func createSession(ctx context.Context, logger *logr.Logger, client actions.ActionsService, runnerScaleSetId int) (*actions.RunnerScaleSetSession, *actions.RunnerScaleSetMessage, error) { hostName, err := os.Hostname() if err != nil { hostName = uuid.New().String() logger.Info("could not get hostname, fail back to a random string.", "fallback", hostName) } var runnerScaleSetSession *actions.RunnerScaleSetSession var retryCount int for { runnerScaleSetSession, err = client.CreateMessageSession(ctx, runnerScaleSetId, hostName) if err == nil { break } clientSideError := &actions.HttpClientSideError{} if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict { logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.") return nil, nil, fmt.Errorf("create message session http request failed. %w", err) } retryCount++ if retryCount >= sessionCreationMaxRetryCount { return nil, nil, fmt.Errorf("create message session failed since it exceed %d retry limit. %w", sessionCreationMaxRetryCount, err) } logger.Info("unable to create message session. Will try again in 30 seconds", "error", err.Error()) if ok := ctx.Value(testIgnoreSleep); ok == nil { time.Sleep(getRandomDuration(30, 45)) } } statistics, _ := json.Marshal(runnerScaleSetSession.Statistics) logger.Info("current runner scale set statistics.", "statistics", string(statistics)) if runnerScaleSetSession.Statistics.TotalAvailableJobs > 0 || runnerScaleSetSession.Statistics.TotalAssignedJobs > 0 { acquirableJobs, err := client.GetAcquirableJobs(ctx, runnerScaleSetId) if err != nil { return nil, nil, fmt.Errorf("get acquirable jobs failed. %w", err) } acquirableJobsJson, err := json.Marshal(acquirableJobs.Jobs) if err != nil { return nil, nil, fmt.Errorf("marshal acquirable jobs failed. %w", err) } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: string(acquirableJobsJson), } return runnerScaleSetSession, initialMessage, nil } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: "", } return runnerScaleSetSession, initialMessage, nil }
hostName, err := os.Hostname() if err != nil { hostName = uuid.New().String() logger.Info("could not get hostname, fail back to a random string.", "fallback", hostName) }
var runnerScaleSetSession *actions.RunnerScaleSetSession var retryCount int for { runnerScaleSetSession, err = client.CreateMessageSession(ctx, runnerScaleSetId, hostName) if err == nil { break } clientSideError := &actions.HttpClientSideError{} if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict { logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.") return nil, nil, fmt.Errorf("create message session http request failed. %w", err) } retryCount++ if retryCount >= sessionCreationMaxRetryCount { return nil, nil, fmt.Errorf("create message session failed since it exceed %d retry limit. %w", sessionCreationMaxRetryCount, err) } logger.Info("unable to create message session. Will try again in 30 seconds", "error", err.Error()) if ok := ctx.Value(testIgnoreSleep); ok == nil { time.Sleep(getRandomDuration(30, 45)) } }
scaleSetEndpointは _apis/runtime/runnerscalesets になります。
func (c *Client) CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*RunnerScaleSetSession, error) { path := fmt.Sprintf("/%s/%d/sessions", scaleSetEndpoint, runnerScaleSetId) newSession := &RunnerScaleSetSession{ OwnerName: owner, } requestData, err := json.Marshal(newSession) if err != nil { return nil, err } createdSession := &RunnerScaleSetSession{} err = c.doSessionRequest(ctx, http.MethodPost, path, bytes.NewBuffer(requestData), http.StatusOK, createdSession) return createdSession, err }
func (c *Client) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error { req, err := c.NewActionsServiceRequest(ctx, method, path, requestData) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode == expectedResponseStatusCode && responseUnmarshalTarget != nil { return json.NewDecoder(resp.Body).Decode(responseUnmarshalTarget) } if resp.StatusCode >= 400 && resp.StatusCode < 500 { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) body = trimByteOrderMark(body) if err != nil { return err } return fmt.Errorf("unexpected status code: %d - body: %s", resp.StatusCode, string(body)) }
AutoScalingRunnerSetリソースデプロイ段階で、GitHub上にはRunner Scale Setが作成されています。
このRunner Scale Set上で、有効なGitHub Actions Job(runnerScaleSetSession.Statistics.TotalAvailableJobs)が1つ以上、またはアサインされているGitHub Actions Job(runnerScaleSetSession.Statistics.TotalAssignedJobs)が1つ以上の場合に、client.GetAcquirableJobs関数でJobのlistを取得します。そして、Jobのlistをjson化して、初回のMessage(MessageIdが0)を作成します。
if runnerScaleSetSession.Statistics.TotalAvailableJobs > 0 || runnerScaleSetSession.Statistics.TotalAssignedJobs > 0 { acquirableJobs, err := client.GetAcquirableJobs(ctx, runnerScaleSetId) if err != nil { return nil, nil, fmt.Errorf("get acquirable jobs failed. %w", err) } acquirableJobsJson, err := json.Marshal(acquirableJobs.Jobs) if err != nil { return nil, nil, fmt.Errorf("marshal acquirable jobs failed. %w", err) } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: string(acquirableJobsJson), } return runnerScaleSetSession, initialMessage, nil } initialMessage := &actions.RunnerScaleSetMessage{ MessageId: 0, MessageType: "RunnerScaleSetJobMessages", Statistics: runnerScaleSetSession.Statistics, Body: "", } return runnerScaleSetSession, initialMessage, nil
scaleSetEndpointは _apis/runtime/runnerscalesets になります。
func (c *Client) GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*AcquirableJobList, error) { path := fmt.Sprintf("/%s/%d/acquirablejobs", scaleSetEndpoint, runnerScaleSetId) req, err := c.NewActionsServiceRequest(ctx, http.MethodGet, path, nil) if err != nil { return nil, err } resp, err := c.Do(req) if err != nil { return nil, err } if resp.StatusCode == http.StatusNoContent { defer resp.Body.Close() return &AcquirableJobList{Count: 0, Jobs: []AcquirableJob{}}, nil } if resp.StatusCode != http.StatusOK { return nil, ParseActionsErrorFromResponse(resp) } var acquirableJobList *AcquirableJobList err = json.NewDecoder(resp.Body).Decode(&acquirableJobList) if err != nil { return nil, err } return acquirableJobList, nil }
type AcquirableJobList struct { Count int `json:"count"` Jobs []AcquirableJob `json:"value"` } type AcquirableJob struct { AcquireJobUrl string `json:"acquireJobUrl"` MessageType string `json:"messageType"` RunnerRequestId int64 `json:"runnerRequestId"` RepositoryName string `json:"repositoryName"` OwnerName string `json:"ownerName"` JobWorkflowRef string `json:"jobWorkflowRef"` EventName string `json:"eventName"` RequestLabels []string `json:"requestLabels"` }
この時、SessionとGitHub用Clientを使用して、 newSessionClient関数でlistener.clientを作成します。
listener.lastMessageId = 0
listener.initialMessage = initialMessage
listener.client = newSessionClient(client, logger, session)
なお、listener.clientはactions.SessionService interfaceになっているので、SessionRefreshingClient構造体も入れることができます。
func newSessionClient(client actions.ActionsService, logger *logr.Logger, session *actions.RunnerScaleSetSession) *SessionRefreshingClient { return &SessionRefreshingClient{ client: client, session: session, logger: logger.WithName("refreshing_client"), } }
actions.SessionService interfaceは以下のメソッドが定義されています。
type SessionService interface { GetMessage(ctx context.Context, lastMessageId int64) (*RunnerScaleSetMessage, error) DeleteMessage(ctx context.Context, messageId int64) error AcquireJobs(ctx context.Context, requestIds []int64) ([]int64, error) io.Closer }
// Create kube manager and scale controller kubeManager, err := NewKubernetesManager(&logger) if err != nil { return fmt.Errorf("failed to create kubernetes manager: %w", err) }
func NewKubernetesManager(logger *logr.Logger) (*AutoScalerKubernetesManager, error) { conf, err := rest.InClusterConfig() if err != nil { return nil, err } kubeClient, err := kubernetes.NewForConfig(conf) if err != nil { return nil, err } var manager = &AutoScalerKubernetesManager{ Clientset: kubeClient, logger: logger.WithName("KubernetesManager"), } return manager, nil }
scaleSettings := &ScaleSettings{ Namespace: rc.EphemeralRunnerSetNamespace, ResourceName: rc.EphemeralRunnerSetName, MaxRunners: rc.MaxRunners, MinRunners: rc.MinRunners, } service, err := NewService(ctx, autoScalerClient, kubeManager, scaleSettings, opts.serviceOptions...) if err != nil { return fmt.Errorf("failed to create new service: %v", err) }
func NewService( ctx context.Context, rsClient RunnerScaleSetClient, manager KubernetesManager, settings *ScaleSettings, options ...func(*Service), ) (*Service, error) { s := &Service{ ctx: ctx, rsClient: rsClient, kubeManager: manager, settings: settings, currentRunnerCount: -1, // force patch on startup logger: logr.FromContextOrDiscard(ctx), } for _, option := range options { option(s) } if len(s.errs) > 0 { return nil, errors.Join(s.errs...) } return s, nil }
// Start listening for messages if err = service.Start(); err != nil { return fmt.Errorf("failed to start message queue listener: %w", err) }
いよいよ、AutoScalingListener Podの主処理であるロングポーリングの開始です。
func (s *Service) Start() error { for { s.logger.Info("waiting for message...") select { case <-s.ctx.Done(): s.logger.Info("service is stopped.") return nil default: err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage) if err != nil { return fmt.Errorf("could not get and process message. %w", err) } } } }
4.When a workflow run is triggered from a repository, the Actions Service dispatches individual job runs to the runners or runner scalesets where the runs-on property matches the name of the runner scaleset or labels of self-hosted runners.
5.When the Runner ScaleSet Listener receives the Job Available message, it checks whether it can scale up to the desired count. If it can, the Runner ScaleSet Listener acknowledges the message.
6.The Runner ScaleSet Listener uses a Service Account and a Role bound to that account to make an HTTPS call through the Kubernetes APIs to patch the EphemeralRunner Set resource with the number of desired replicas count.
AutoScalingListener PodはGitHubとのロングポーリングを通して、Messageを受信します。
err := s.rsClient.GetRunnerScaleSetMessage(s.ctx, s.processMessage)
func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error) error { if m.initialMessage != nil { err := handler(m.initialMessage) if err != nil { return fmt.Errorf("fail to process initial message. %w", err) } m.initialMessage = nil return nil } for { message, err := m.client.GetMessage(ctx, m.lastMessageId) if err != nil { return fmt.Errorf("get message failed from refreshing client. %w", err) } if message == nil { continue } err = handler(message) if err != nil { return fmt.Errorf("handle message failed. %w", err) } m.lastMessageId = message.MessageId return m.deleteMessage(ctx, message.MessageId) } }
レシーバであるSessionRefreshingClient構造体は、actions.SessionService interfaceに格納されています。
そのため、ListenerのClientの型であるactions.SessionService interface越しにGetMessage関数が呼び出せます。
func (m *SessionRefreshingClient) GetMessage(ctx context.Context, lastMessageId int64) (*actions.RunnerScaleSetMessage, error) { message, err := m.client.GetMessage(ctx, m.session.MessageQueueUrl, m.session.MessageQueueAccessToken, lastMessageId) if err == nil { return message, nil } expiredError := &actions.MessageQueueTokenExpiredError{} if !errors.As(err, &expiredError) { return nil, fmt.Errorf("get message failed. %w", err) } m.logger.Info("message queue token is expired during GetNextMessage, refreshing...") session, err := m.client.RefreshMessageSession(ctx, m.session.RunnerScaleSet.Id, m.session.SessionId) if err != nil { return nil, fmt.Errorf("refresh message session failed. %w", err) } m.session = session message, err = m.client.GetMessage(ctx, m.session.MessageQueueUrl, m.session.MessageQueueAccessToken, lastMessageId) if err != nil { return nil, fmt.Errorf("delete message failed after refresh message session. %w", err) } return message, nil }
そしてReponseを json.NewDecoder(resp.Body).Decode(&message)関数
func (c *Client) GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*RunnerScaleSetMessage, error) { u, err := url.Parse(messageQueueUrl) if err != nil { return nil, err } if lastMessageId > 0 { q := u.Query() q.Set("lastMessageId", strconv.FormatInt(lastMessageId, 10)) u.RawQuery = q.Encode() } req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/json; api-version=6.0-preview") req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", messageQueueAccessToken)) if c.userAgent != "" { req.Header.Set("User-Agent", c.userAgent) } resp, err := c.Do(req) if err != nil { return nil, err } : var message *RunnerScaleSetMessage err = json.NewDecoder(resp.Body).Decode(&message) if err != nil { return nil, err } return message, nil }
func (s *Service) processMessage(message *actions.RunnerScaleSetMessage) error { : return s.scaleForAssignedJobCount(message.Statistics.TotalAssignedJobs) }
まず、targetRunnerCount := int(math.Max(math.Min(float64(s.settings.MaxRunners), float64(count)), float64(s.settings.MinRunners)))
func (s *Service) scaleForAssignedJobCount(count int) error { targetRunnerCount := int(math.Max(math.Min(float64(s.settings.MaxRunners), float64(count)), float64(s.settings.MinRunners))) s.metricsExporter.publishDesiredRunners(targetRunnerCount) if targetRunnerCount != s.currentRunnerCount { s.logger.Info("try scale runner request up/down base on assigned job count", "assigned job", count, "decision", targetRunnerCount, "min", s.settings.MinRunners, "max", s.settings.MaxRunners, "currentRunnerCount", s.currentRunnerCount, ) err := s.kubeManager.ScaleEphemeralRunnerSet(s.ctx, s.settings.Namespace, s.settings.ResourceName, targetRunnerCount) if err != nil { return fmt.Errorf("could not scale ephemeral runner set (%s/%s). %w", s.settings.Namespace, s.settings.ResourceName, err) } s.currentRunnerCount = targetRunnerCount } return nil }
func (k *AutoScalerKubernetesManager) ScaleEphemeralRunnerSet(ctx context.Context, namespace, resourceName string, runnerCount int) error { original := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: -1, }, } originalJson, err := json.Marshal(original) if err != nil { k.logger.Error(err, "could not marshal empty ephemeral runner set") } patch := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: runnerCount, }, } patchJson, err := json.Marshal(patch) if err != nil { k.logger.Error(err, "could not marshal patch ephemeral runner set") } mergePatch, err := jsonpatch.CreateMergePatch(originalJson, patchJson) if err != nil { k.logger.Error(err, "could not create merge patch json for ephemeral runner set") } k.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch)) patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{} err = k.RESTClient(). Patch(types.MergePatchType). Prefix("apis", "actions.github.com", "v1alpha1"). Namespace(namespace). Resource("EphemeralRunnerSets"). Name(resourceName). Body([]byte(mergePatch)). Do(ctx). Into(patchedEphemeralRunnerSet) if err != nil { return fmt.Errorf("could not patch ephemeral runner set , patch JSON: %s, error: %w", string(mergePatch), err) } k.logger.Info("Ephemeral runner set scaled.", "namespace", namespace, "name", resourceName, "replicas", patchedEphemeralRunnerSet.Spec.Replicas) return nil }
original := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: -1, }, } originalJson, err := json.Marshal(original) if err != nil { k.logger.Error(err, "could not marshal empty ephemeral runner set") }
作成が終わったら、パッチ対象とパッチを jsonpatch.CreateMergePatch関数でマージします。
patch := &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: runnerCount, }, } patchJson, err := json.Marshal(patch) if err != nil { k.logger.Error(err, "could not marshal patch ephemeral runner set") } mergePatch, err := jsonpatch.CreateMergePatch(originalJson, patchJson) if err != nil { k.logger.Error(err, "could not create merge patch json for ephemeral runner set") }
patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{} err = k.RESTClient(). Patch(types.MergePatchType). Prefix("apis", "actions.github.com", "v1alpha1"). Namespace(namespace). Resource("EphemeralRunnerSets"). Name(resourceName). Body([]byte(mergePatch)). Do(ctx). Into(patchedEphemeralRunnerSet) if err != nil { return fmt.Errorf("could not patch ephemeral runner set , patch JSON: %s, error: %w", string(mergePatch), err) } k.logger.Info("Ephemeral runner set scaled.", "namespace", namespace, "name", resourceName, "replicas", patchedEphemeralRunnerSet.Spec.Replicas) return nil
スケールができたら、いよいよActions Runner ControllerのSelf-hosted runnerであるEphemeralRunnerリソースおよびEphemeralRunner Podが払い出されます。
EphemeralRunnerSet ControllerでのEphemeralRunnerリソースの作成および削除
7.The EphemeralRunner Set attempts to create new runners and the EphemeralRunner Controller requests a JIT configuration token to register these runners. The controller attempts to create runner pods. If the pod's status is failed, the controller retries up to 5 times. After 24 hours the Actions Service unassigns the job if no runner accepts it.
EphemeralRunnerリソースはEphemeralRunnerSet Controllerが作成します。
// Find all EphemeralRunner with matching namespace and own by this EphemeralRunnerSet. ephemeralRunnerList := new(v1alpha1.EphemeralRunnerList) err := r.List( ctx, ephemeralRunnerList, client.InNamespace(req.Namespace), client.MatchingFields{resourceOwnerKey: req.Name}, ) if err != nil { log.Error(err, "Unable to list child ephemeral runners") return ctrl.Result{}, err } pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList)
func categorizeEphemeralRunners(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) (pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners []*v1alpha1.EphemeralRunner) { for i := range ephemeralRunnerList.Items { r := &ephemeralRunnerList.Items[i] if !r.ObjectMeta.DeletionTimestamp.IsZero() { deletingEphemeralRunners = append(deletingEphemeralRunners, r) continue } switch r.Status.Phase { case corev1.PodRunning: runningEphemeralRunners = append(runningEphemeralRunners, r) case corev1.PodSucceeded: finishedEphemeralRunners = append(finishedEphemeralRunners, r) case corev1.PodFailed: failedEphemeralRunners = append(failedEphemeralRunners, r) default: // Pending or no phase should be considered as pending. // // If field is not set, that means that the EphemeralRunner // did not yet have chance to update the Status.Phase field. pendingEphemeralRunners = append(pendingEphemeralRunners, r) } } return }
// cleanup finished runners and proceed var errs []error for i := range finishedEphemeralRunners { log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { if !kerrors.IsNotFound(err) { errs = append(errs, err) } } }
total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) switch { case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up count := ephemeralRunnerSet.Spec.Replicas - total log.Info("Creating new ephemeral runners (scale up)", "count", count) if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { log.Error(err, "failed to make ephemeral runner") return ctrl.Result{}, err } :
func (r *EphemeralRunnerSetReconciler) createEphemeralRunners(ctx context.Context, runnerSet *v1alpha1.EphemeralRunnerSet, count int, log logr.Logger) error { // Track multiple errors at once and return the bundle. errs := make([]error, 0) for i := 0; i < count; i++ { ephemeralRunner := r.resourceBuilder.newEphemeralRunner(runnerSet) if runnerSet.Spec.EphemeralRunnerSpec.Proxy != nil { ephemeralRunner.Spec.ProxySecretRef = proxyEphemeralRunnerSetSecretName(runnerSet) } // Make sure that we own the resource we create. if err := ctrl.SetControllerReference(runnerSet, ephemeralRunner, r.Scheme); err != nil { log.Error(err, "failed to set controller reference on ephemeral runner") errs = append(errs, err) continue } log.Info("Creating new ephemeral runner", "progress", i+1, "total", count) if err := r.Create(ctx, ephemeralRunner); err != nil { log.Error(err, "failed to make ephemeral runner") errs = append(errs, err) continue } log.Info("Created new ephemeral runner", "runner", ephemeralRunner.Name) } return multierr.Combine(errs...) }
func (b *resourceBuilder) newEphemeralRunner(ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet) *v1alpha1.EphemeralRunner { labels := make(map[string]string) for _, key := range commonLabelKeys { switch key { case LabelKeyKubernetesComponent: labels[key] = "runner" default: v, ok := ephemeralRunnerSet.Labels[key] if !ok { continue } labels[key] = v } } annotations := make(map[string]string) for key, val := range ephemeralRunnerSet.Annotations { annotations[key] = val } return &v1alpha1.EphemeralRunner{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ GenerateName: ephemeralRunnerSet.Name + "-runner-", Namespace: ephemeralRunnerSet.Namespace, Labels: labels, Annotations: annotations, }, Spec: ephemeralRunnerSet.Spec.EphemeralRunnerSpec, } }
次に以下の動作の続きの部分、EphemeralRunner Podの作成を解説します。
このPodがSelft-hosted runnerとして動作します。
7.The EphemeralRunner Set attempts to create new runners and the EphemeralRunner Controller requests a JIT configuration token to register these runners. The controller attempts to create runner pods. If the pod's status is failed, the controller retries up to 5 times. After 24 hours the Actions Service unassigns the job if no runner accepts it.
EphemeralRunner PodはEphemeralRunner Controllerによって作成されます。
この時、Just-In-Time(以降JIT) Tokenを作成して、GitHub上にJobごとにRunnerを作成します。
まず、r.updateStatusWithRunnerConfig関数でJIT Configを取得します。
if ephemeralRunner.Status.RunnerId == 0 { log.Info("Creating new ephemeral runner registration and updating status with runner config") return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) }
r.updateStatusWithRunnerConfig関数では、actionsClient.GenerateJitRunnerConfig関数でJIT Configを作成して、EphemeralRunnerの.Status.RunnerJITConfigに設定します。
func (r *EphemeralRunnerReconciler) updateStatusWithRunnerConfig(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) (ctrl.Result, error) { // Runner is not registered with the service. We need to register it first log.Info("Creating ephemeral runner JIT config") actionsClient, err := r.actionsClientFor(ctx, ephemeralRunner) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to get actions client for generating JIT config: %v", err) } jitSettings := &actions.RunnerScaleSetJitRunnerSetting{ Name: ephemeralRunner.Name, } jitConfig, err := actionsClient.GenerateJitRunnerConfig(ctx, jitSettings, ephemeralRunner.Spec.RunnerScaleSetId) : log.Info("Created ephemeral runner JIT config", "runnerId", jitConfig.Runner.Id) log.Info("Updating ephemeral runner status with runnerId and runnerJITConfig") err = patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { obj.Status.RunnerId = jitConfig.Runner.Id obj.Status.RunnerName = jitConfig.Runner.Name obj.Status.RunnerJITConfig = jitConfig.EncodedJITConfig }) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to update runner status for RunnerId/RunnerName/RunnerJITConfig: %v", err) } log.Info("Updated ephemeral runner status with runnerId and runnerJITConfig") return ctrl.Result{}, nil }
でresponseからJIT Configを取得します。
func (c *Client) GenerateJitRunnerConfig(ctx context.Context, jitRunnerSetting *RunnerScaleSetJitRunnerSetting, scaleSetId int) (*RunnerScaleSetJitRunnerConfig, error) { path := fmt.Sprintf("/%s/%d/generatejitconfig", scaleSetEndpoint, scaleSetId) body, err := json.Marshal(jitRunnerSetting) if err != nil { return nil, err } req, err := c.NewActionsServiceRequest(ctx, http.MethodPost, path, bytes.NewBuffer(body)) if err != nil { return nil, err } resp, err := c.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { return nil, ParseActionsErrorFromResponse(resp) } var runnerJitConfig *RunnerScaleSetJitRunnerConfig err = json.NewDecoder(resp.Body).Decode(&runnerJitConfig) if err != nil { return nil, err } return runnerJitConfig, nil }
ここまで、JIT Configの取得方法を解説してきましたが、JITに関しては以下の記事にすごく詳しく書いているので、併せてご確認ください。
なお、Runner Scale SetのJIT Configは以下の形式となっております。
{ ".runner": { "AgentId": "322", "AgentName": "arc-runner-set-v7nzm-runner-fmfcz", "DisableUpdate": "True", "Ephemeral": "True", "PoolId": "1", "PoolName": null, "ServerUrl": "https://pipelines.actions.githubusercontent.com/abKlCigKduPqALsnnIvHC16pYwPgN97mNIjD0UdG00l4ne31uP/", "RunnerScaleSetId": "12", "RunnerScaleSetName": "arc-runner-set", "WorkFolder": "_work" }, ".credentials": {xxxxxxxxxxxxxxxxxxxxxxx}, ".credentials_rsaparams": {xxxxxxxxxxxxxxxxxxxxxxx} }
JIT Configが取得できたら、これをSecretリソース化します。
secret := new(corev1.Secret) if err := r.Get(ctx, req.NamespacedName, secret); err != nil { if !kerrors.IsNotFound(err) { log.Error(err, "Failed to fetch secret") return ctrl.Result{}, err } // create secret if not created log.Info("Creating new ephemeral runner secret for jitconfig.") return r.createSecret(ctx, ephemeralRunner, log) }
func (r *EphemeralRunnerReconciler) createSecret(ctx context.Context, runner *v1alpha1.EphemeralRunner, log logr.Logger) (ctrl.Result, error) { log.Info("Creating new secret for ephemeral runner") jitSecret := r.resourceBuilder.newEphemeralRunnerJitSecret(runner) if err := ctrl.SetControllerReference(runner, jitSecret, r.Scheme); err != nil { return ctrl.Result{}, fmt.Errorf("failed to set controller reference: %v", err) } log.Info("Created new secret spec for ephemeral runner") if err := r.Create(ctx, jitSecret); err != nil { return ctrl.Result{}, fmt.Errorf("failed to create jit secret: %v", err) } log.Info("Created ephemeral runner secret", "secretName", jitSecret.Name) return ctrl.Result{Requeue: true}, nil }
newEphemeralRunnerJitSecret関数では、先ほど作成したJIT Configを.Data.jitTokenKeyのValueとしてSecret化します。
func (b *resourceBuilder) newEphemeralRunnerJitSecret(ephemeralRunner *v1alpha1.EphemeralRunner) *corev1.Secret { return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace, }, Data: map[string][]byte{ jitTokenKey: []byte(ephemeralRunner.Status.RunnerJITConfig), }, } }
次に、Selft-hosted runnerであるEphemeralRunner Podを作成します。
また、それ以外の場合にEphemeralRunner Podを作成します。
pod := new(corev1.Pod) if err := r.Get(ctx, req.NamespacedName, pod); err != nil { switch { case !kerrors.IsNotFound(err): log.Error(err, "Failed to fetch the pod") return ctrl.Result{}, err case len(ephemeralRunner.Status.Failures) > 5: log.Info("EphemeralRunner has failed more than 5 times. Marking it as failed") if err := r.markAsFailed(ctx, ephemeralRunner, log); err != nil { log.Error(err, "Failed to set ephemeral runner to phase Failed") return ctrl.Result{}, err } return ctrl.Result{}, nil default: // Pod was not found. Create if the pod has never been created log.Info("Creating new EphemeralRunner pod.") return r.createPod(ctx, ephemeralRunner, secret, log) } }
func (r *EphemeralRunnerReconciler) markAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) error { log.Info("Updating ephemeral runner status to Failed") if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { obj.Status.Phase = corev1.PodFailed obj.Status.Reason = "TooManyPodFailures" obj.Status.Message = "Pod has failed to start more than 5 times" }); err != nil { return fmt.Errorf("failed to update ephemeral runner status Phase/Message: %v", err) } log.Info("Removing the runner from the service") if err := r.deleteRunnerFromService(ctx, ephemeralRunner, log); err != nil { return fmt.Errorf("failed to remove the runner from service: %v", err) } log.Info("EphemeralRunner is marked as Failed and deleted from the service") return nil }
func (r *EphemeralRunnerReconciler) deleteRunnerFromService(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) error { client, err := r.actionsClientFor(ctx, ephemeralRunner) if err != nil { return fmt.Errorf("failed to get actions client for runner: %v", err) } log.Info("Removing runner from the service", "runnerId", ephemeralRunner.Status.RunnerId) err = client.RemoveRunner(ctx, int64(ephemeralRunner.Status.RunnerId)) if err != nil { return fmt.Errorf("failed to remove runner from the service: %w", err) } log.Info("Removed runner from the service", "runnerId", ephemeralRunner.Status.RunnerId) return nil }
func (c *Client) RemoveRunner(ctx context.Context, runnerId int64) error { path := fmt.Sprintf("/%s/%d", runnerEndpoint, runnerId) req, err := c.NewActionsServiceRequest(ctx, http.MethodDelete, path, nil) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusNoContent { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() return nil }
次に、EphemeralRunner Podを作成する関数を見てみましょう。
func (r *EphemeralRunnerReconciler) createPod(ctx context.Context, runner *v1alpha1.EphemeralRunner, secret *corev1.Secret, log logr.Logger) (ctrl.Result, error) { : log.Info("Creating new pod for ephemeral runner") newPod := r.resourceBuilder.newEphemeralRunnerPod(ctx, runner, secret, envs...) if err := ctrl.SetControllerReference(runner, newPod, r.Scheme); err != nil { log.Error(err, "Failed to set controller reference to a new pod") return ctrl.Result{}, err } log.Info("Created new pod spec for ephemeral runner") if err := r.Create(ctx, newPod); err != nil { log.Error(err, "Failed to create pod resource for ephemeral runner.") return ctrl.Result{}, err } log.Info("Created ephemeral runner pod", "runnerScaleSetId", runner.Spec.RunnerScaleSetId, "runnerName", runner.Status.RunnerName, "runnerId", runner.Status.RunnerId, "configUrl", runner.Spec.GitHubConfigUrl, "podName", newPod.Name) return ctrl.Result{}, nil }
重要な箇所のみ抜粋しています。JIT ConfigのSecretで環境変数を設定しているのが分かるかと思います。
func (b *resourceBuilder) newEphemeralRunnerPod(ctx context.Context, runner *v1alpha1.EphemeralRunner, secret *corev1.Secret, envs ...corev1.EnvVar) *corev1.Pod { var newPod corev1.Pod : newPod.ObjectMeta = objectMeta newPod.Spec = runner.Spec.PodTemplateSpec.Spec newPod.Spec.Containers = make([]corev1.Container, 0, len(runner.Spec.PodTemplateSpec.Spec.Containers)) for _, c := range runner.Spec.PodTemplateSpec.Spec.Containers { if c.Name == EphemeralRunnerContainerName { c.Env = append( c.Env, corev1.EnvVar{ Name: EnvVarRunnerJITConfig, ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: secret.Name, }, Key: jitTokenKey, }, }, }, corev1.EnvVar{ Name: EnvVarRunnerExtraUserAgent, Value: fmt.Sprintf("actions-runner-controller/%s", build.Version), }, ) c.Env = append(c.Env, envs...) } newPod.Spec.Containers = append(newPod.Spec.Containers, c) } return &newPod }
EphemeralRunner PodはJIT Configを使用し、コンテナ内で自身をGitHubにRunnerとして登録して、Jobが開始されます。
8.Once the runner pod is created, the runner application in the pod uses the JIT configuration token to register itself with the Actions Service. It then establishes another HTTPS long poll connection to receive the job details it needs to execute.
9.The Actions Service acknowledges the runner registration and dispatches the job run details.
10.Throughout the job run execution, the runner continuously communicates the logs and job run status back to the Actions Service.
Podは削除されてもEphemeralRunner Contollerによって自動再作成されます。なお、失敗は5回まで許可されます。
cs := runnerContainerStatus(pod) switch { case cs == nil: // starting, no container state yet log.Info("Waiting for runner container status to be available") return ctrl.Result{}, nil case cs.State.Terminated == nil: // still running or evicted if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted" { log.Info("Pod set the termination phase, but container state is not terminated. Deleting pod", "PodPhase", pod.Status.Phase, "PodReason", pod.Status.Reason, "PodMessage", pod.Status.Message, ) if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "failed to delete pod as failed on pod.Status.Phase: Failed") return ctrl.Result{}, err } return ctrl.Result{}, nil } log.Info("Ephemeral runner container is still running") if err := r.updateRunStatusFromPod(ctx, ephemeralRunner, pod, log); err != nil { log.Info("Failed to update ephemeral runner status. Requeue to not miss this event") return ctrl.Result{}, err } return ctrl.Result{}, nil
Podは削除されてもEphemeralRunner Contollerが自動再作成されます。なお、失敗は5回まで許可されます。
case cs.State.Terminated.ExitCode != 0: // failed log.Info("Ephemeral runner container failed", "exitCode", cs.State.Terminated.ExitCode) if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "Failed to delete runner pod on failure") return ctrl.Result{}, err } return ctrl.Result{}, nil
default: // pod succeeded. We double-check with the service if the runner exists. // The reason is that image can potentially finish with status 0, but not pick up the job. existsInService, err := r.runnerRegisteredWithService(ctx, ephemeralRunner.DeepCopy(), log) if err != nil { log.Error(err, "Failed to check if runner is registered with the service") return ctrl.Result{}, err } if !existsInService { // the runner does not exist in the service, so it must be done log.Info("Ephemeral runner has finished since it does not exist in the service anymore") if err := r.markAsFinished(ctx, ephemeralRunner, log); err != nil { log.Error(err, "Failed to mark ephemeral runner as finished") return ctrl.Result{}, err } return ctrl.Result{}, nil } // The runner still exists. This can happen if the pod exited with 0 but fails to start log.Info("Ephemeral runner pod has finished, but the runner still exists in the service. Deleting the pod to restart it.") if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { log.Error(err, "failed to delete a pod that still exists in the service") return ctrl.Result{}, err } return ctrl.Result{}, nil }
11.When the runner completes its job successfully, the EphemeralRunner Controller checks with the Actions Service to see if runner can be deleted. If it can, the Ephemeral RunnerSet deletes the runner.
なお、EphemeralRunnerリソースの子として、EphemeralRunner Podが設定されているので、EphemeralRunnerリソース削除前にEphemeralRunner Podが削除されます。
total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) switch { : case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. count := total - ephemeralRunnerSet.Spec.Replicas log.Info("Deleting ephemeral runners (scale down)", "count", count) if err := r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners, count, log); err != nil { log.Error(err, "failed to delete idle runners") return ctrl.Result{}, err } }
func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners []*v1alpha1.EphemeralRunner, count int, log logr.Logger) error { runners := newEphemeralRunnerStepper(pendingEphemeralRunners, runningEphemeralRunners) if runners.len() == 0 { log.Info("No pending or running ephemeral runners running at this time for scale down") return nil } actionsClient, err := r.actionsClientFor(ctx, ephemeralRunnerSet) if err != nil { return fmt.Errorf("failed to create actions client for ephemeral runner replica set: %v", err) } var errs []error deletedCount := 0 for runners.next() { ephemeralRunner := runners.object() if ephemeralRunner.Status.RunnerId == 0 { log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) continue } if ephemeralRunner.Status.JobRequestId > 0 { log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) continue } log.Info("Removing the idle ephemeral runner", "name", ephemeralRunner.Name) ok, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) if err != nil { errs = append(errs, err) } if !ok { continue } deletedCount++ if deletedCount == count { break } } return multierr.Combine(errs...) }
func (r *EphemeralRunnerSetReconciler) deleteEphemeralRunnerWithActionsClient(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, actionsClient actions.ActionsService, log logr.Logger) (bool, error) { if err := actionsClient.RemoveRunner(ctx, int64(ephemeralRunner.Status.RunnerId)); err != nil { actionsError := &actions.ActionsError{} if errors.As(err, &actionsError) && actionsError.StatusCode == http.StatusBadRequest && strings.Contains(actionsError.ExceptionName, "JobStillRunningException") { // Runner is still running a job, proceed with the next one return false, nil } return false, err } log.Info("Deleting ephemeral runner after removing from the service", "name", ephemeralRunner.Name, "runnerId", ephemeralRunner.Status.RunnerId) if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { return false, err } log.Info("Deleted ephemeral runner", "name", ephemeralRunner.Name, "runnerId", ephemeralRunner.Status.RunnerId) return true, nil }
func (c *Client) RemoveRunner(ctx context.Context, runnerId int64) error { path := fmt.Sprintf("/%s/%d", runnerEndpoint, runnerId) req, err := c.NewActionsServiceRequest(ctx, http.MethodDelete, path, nil) if err != nil { return err } resp, err := c.Do(req) if err != nil { return err } if resp.StatusCode != http.StatusNoContent { return ParseActionsErrorFromResponse(resp) } defer resp.Body.Close() return nil }
今回のコード解説が皆様の日頃のGitHub Actionsライフに少しでも役に立てたら嬉しいです!
私達ACS事業部はAzure・AKSなどのクラウドネイティブ技術を活用した内製化やGitHub Enterpriseの導入のご支援をしております。