Introduce ActionRunAttempt to represent each execution of a run (#37119)
This PR introduces a new `ActionRunAttempt` model and makes Actions
execution attempt-scoped.
**Main Changes**
- Each workflow run trigger generates a new `ActionRunAttempt`. The
triggered jobs are then associated with this new `ActionRunAttempt`
record.
- Each rerun now creates:
- a new `ActionRunAttempt` record for the workflow run
- a full new set of `ActionRunJob` records for the new
`ActionRunAttempt`
- For jobs that need to be rerun, the new job records are created as
runnable jobs in the new attempt.
- For jobs that do not need to be rerun, new job records are still
created in the new attempt, but they reuse the result of the previous
attempt instead of executing again.
- Introduce `rerunPlan` to manage each rerun and refactored rerun flow
into a two-phase plan-based model:
- `buildRerunPlan`
- `execRerunPlan`
- `RerunFailedWorkflowRun` and `RerunFailed` no longer directly derives
all jobs that need to be rerun; this step is now handled by
`buildRerunPlan`.
- Converted artifacts from run-scoped to attempt-scoped:
- uploads are now associated with `RunAttemptID`
- listing, download, and deletion resolve against the current attempt
- Added attempt-aware web Actions views:
- the default run page shows the latest attempt
(`/actions/runs/{run_id}`)
- previous attempt pages show jobs and artifacts for that attempt
(`/actions/runs/{run_id}/attempts/{attempt_num}`)
- New APIs:
- `/repos/{owner}/{repo}/actions/runs/{run}/attempts/{attempt}`
- `/repos/{owner}/{repo}/actions/runs/{run}/attempts/{attempt}/jobs`
- New configuration `MAX_RERUN_ATTEMPTS`
- https://gitea.com/gitea/docs/pulls/383
**Compatibility**
- Existing legacy runs use `LatestAttemptID = 0` and legacy jobs use
`RunAttemptID = 0`. Therefore, these fields can be used to identify
legacy runs and jobs and provide backward compatibility.
- If a legacy run is rerun, an `ActionRunAttempt` with `attempt=1` will
be created to represent the original execution. Then a new
`ActionRunAttempt` with `attempt=2` will be created for the real rerun.
- Existing artifact records are not backfilled; legacy artifacts
continue to use `RunAttemptID = 0`.
**Improvements**
- It is now easier to inspect and download logs from previous attempts.
-
[`run_attempt`](https://docs.github.com/en/actions/reference/workflows-and-actions/contexts#github-context)
semantics are now aligned with GitHub.
- > A unique number for each attempt of a particular workflow run in a
repository. This number begins at 1 for the workflow run's first
attempt, and increments with each re-run.
- Rerun behavior is now clearer and more explicit.
- Instead of mutating the status of previous jobs in place, each rerun
creates a new attempt with a full new set of job records.
- Artifacts produced by different reruns can now be listed separately.
Signed-off-by: Zettat123 <zettat123@gmail.com>
Co-authored-by: silverwind <me@silverwind.io>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Co-authored-by: Giteabot <teabot@gitea.io>
This commit is contained in:
69
services/actions/approve.go
Normal file
69
services/actions/approve.go
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
repo_model "code.gitea.io/gitea/models/repo"
|
||||
user_model "code.gitea.io/gitea/models/user"
|
||||
)
|
||||
|
||||
func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_model.User, runIDs []int64) error {
|
||||
updatedJobs := make([]*actions_model.ActionRunJob, 0)
|
||||
cancelledConcurrencyJobs := make([]*actions_model.ActionRunJob, 0)
|
||||
|
||||
err := db.WithTx(ctx, func(ctx context.Context) (err error) {
|
||||
for _, runID := range runIDs {
|
||||
run, err := actions_model.GetRunByRepoAndID(ctx, repo.ID, runID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
run.NeedApproval = false
|
||||
run.ApprovedBy = doer.ID
|
||||
if err := actions_model.UpdateRun(ctx, run, "need_approval", "approved_by"); err != nil {
|
||||
return err
|
||||
}
|
||||
jobs, err := actions_model.GetLatestAttemptJobsByRepoAndRunID(ctx, repo.ID, run.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, job := range jobs {
|
||||
// Skip jobs with `needs`: they stay blocked until their dependencies finish,
|
||||
// at which point job_emitter will evaluate and start them.
|
||||
if len(job.Needs) > 0 {
|
||||
continue
|
||||
}
|
||||
var jobsToCancel []*actions_model.ActionRunJob
|
||||
job.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
if job.Status == actions_model.StatusWaiting {
|
||||
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n > 0 {
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, updatedJobs)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledConcurrencyJobs)
|
||||
|
||||
EmitJobsIfReadyByJobs(cancelledConcurrencyJobs)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -179,7 +179,7 @@ func DeleteRun(ctx context.Context, run *actions_model.ActionRun) error {
|
||||
|
||||
repoID := run.RepoID
|
||||
|
||||
jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID)
|
||||
jobs, err := actions_model.GetAllRunJobsByRepoAndRunID(ctx, run.RepoID, run.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -207,6 +207,10 @@ func DeleteRun(ctx context.Context, run *actions_model.ActionRun) error {
|
||||
RepoID: repoID,
|
||||
ID: run.ID,
|
||||
})
|
||||
recordsToDelete = append(recordsToDelete, &actions_model.ActionRunAttempt{
|
||||
RepoID: repoID,
|
||||
RunID: run.ID,
|
||||
})
|
||||
recordsToDelete = append(recordsToDelete, &actions_model.ActionRunJob{
|
||||
RepoID: repoID,
|
||||
RunID: run.ID,
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"code.gitea.io/gitea/modules/timeutil"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
webhook_module "code.gitea.io/gitea/modules/webhook"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
)
|
||||
|
||||
// StopZombieTasks stops the task which have running status, but haven't been updated for a long time
|
||||
@@ -36,39 +35,16 @@ func StopEndlessTasks(ctx context.Context) error {
|
||||
})
|
||||
}
|
||||
|
||||
func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.ActionRunJob) {
|
||||
if len(jobs) == 0 {
|
||||
return
|
||||
}
|
||||
// The input jobs may belong to different runs, so track each affected run.
|
||||
runs := make(map[int64]*actions_model.ActionRun, len(jobs))
|
||||
for _, job := range jobs {
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
log.Error("Failed to load job attributes: %v", err)
|
||||
continue
|
||||
}
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
|
||||
if _, ok := runs[job.RunID]; !ok {
|
||||
runs[job.RunID] = job.Run
|
||||
}
|
||||
}
|
||||
|
||||
for _, run := range runs {
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
|
||||
}
|
||||
}
|
||||
|
||||
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
|
||||
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
|
||||
notifyWorkflowJobStatusUpdate(ctx, jobs)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, jobs)
|
||||
EmitJobsIfReadyByJobs(jobs)
|
||||
return err
|
||||
}
|
||||
|
||||
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
|
||||
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
|
||||
notifyWorkflowJobStatusUpdate(ctx, jobs)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, jobs)
|
||||
EmitJobsIfReadyByJobs(jobs)
|
||||
return err
|
||||
}
|
||||
@@ -83,61 +59,59 @@ func shouldBlockJobByConcurrency(ctx context.Context, job *actions_model.ActionR
|
||||
return false, nil
|
||||
}
|
||||
|
||||
runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("GetConcurrentRunsAndJobs: %w", err)
|
||||
return false, fmt.Errorf("GetConcurrentRunAttemptsAndJobs: %w", err)
|
||||
}
|
||||
|
||||
return len(runs) > 0 || len(jobs) > 0, nil
|
||||
return len(attempts) > 0 || len(jobs) > 0, nil
|
||||
}
|
||||
|
||||
// PrepareToStartJobWithConcurrency prepares a job to start by its evaluated concurrency group and cancelling previous jobs if necessary.
|
||||
// It returns the new status of the job (either StatusBlocked or StatusWaiting) and any error encountered during the process.
|
||||
func PrepareToStartJobWithConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (actions_model.Status, error) {
|
||||
// It returns the new status of the job (either StatusBlocked or StatusWaiting), any cancelled jobs, and any error encountered during the process.
|
||||
func PrepareToStartJobWithConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (actions_model.Status, []*actions_model.ActionRunJob, error) {
|
||||
shouldBlock, err := shouldBlockJobByConcurrency(ctx, job)
|
||||
if err != nil {
|
||||
return actions_model.StatusBlocked, err
|
||||
return actions_model.StatusBlocked, nil, err
|
||||
}
|
||||
|
||||
// even if the current job is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group
|
||||
jobs, err := actions_model.CancelPreviousJobsByJobConcurrency(ctx, job)
|
||||
if err != nil {
|
||||
return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByJobConcurrency: %w", err)
|
||||
return actions_model.StatusBlocked, nil, fmt.Errorf("CancelPreviousJobsByJobConcurrency: %w", err)
|
||||
}
|
||||
notifyWorkflowJobStatusUpdate(ctx, jobs)
|
||||
|
||||
return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil
|
||||
return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), jobs, nil
|
||||
}
|
||||
|
||||
func shouldBlockRunByConcurrency(ctx context.Context, actionRun *actions_model.ActionRun) (bool, error) {
|
||||
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
|
||||
func shouldBlockRunByConcurrency(ctx context.Context, attempt *actions_model.ActionRunAttempt) (bool, error) {
|
||||
if attempt.ConcurrencyGroup == "" || attempt.ConcurrencyCancel {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("find concurrent runs and jobs: %w", err)
|
||||
}
|
||||
|
||||
return len(runs) > 0 || len(jobs) > 0, nil
|
||||
return len(attempts) > 0 || len(jobs) > 0, nil
|
||||
}
|
||||
|
||||
// PrepareToStartRunWithConcurrency prepares a run to start by its evaluated concurrency group and cancelling previous jobs if necessary.
|
||||
// It returns the new status of the run (either StatusBlocked or StatusWaiting) and any error encountered during the process.
|
||||
func PrepareToStartRunWithConcurrency(ctx context.Context, run *actions_model.ActionRun) (actions_model.Status, error) {
|
||||
shouldBlock, err := shouldBlockRunByConcurrency(ctx, run)
|
||||
// PrepareToStartRunWithConcurrency prepares a run attempt to start by its evaluated concurrency group and cancelling previous jobs if necessary.
|
||||
// It returns the new status of the run attempt (either StatusBlocked or StatusWaiting), any cancelled jobs, and any error encountered during the process.
|
||||
func PrepareToStartRunWithConcurrency(ctx context.Context, attempt *actions_model.ActionRunAttempt) (actions_model.Status, []*actions_model.ActionRunJob, error) {
|
||||
shouldBlock, err := shouldBlockRunByConcurrency(ctx, attempt)
|
||||
if err != nil {
|
||||
return actions_model.StatusBlocked, err
|
||||
return actions_model.StatusBlocked, nil, err
|
||||
}
|
||||
|
||||
// even if the current run is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group
|
||||
jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, run)
|
||||
jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, attempt)
|
||||
if err != nil {
|
||||
return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByRunConcurrency: %w", err)
|
||||
return actions_model.StatusBlocked, nil, fmt.Errorf("CancelPreviousJobsByRunConcurrency: %w", err)
|
||||
}
|
||||
notifyWorkflowJobStatusUpdate(ctx, jobs)
|
||||
|
||||
return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil
|
||||
return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), jobs, nil
|
||||
}
|
||||
|
||||
func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
|
||||
@@ -175,7 +149,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
|
||||
remove()
|
||||
}
|
||||
|
||||
notifyWorkflowJobStatusUpdate(ctx, jobs)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, jobs)
|
||||
EmitJobsIfReadyByJobs(jobs)
|
||||
|
||||
return nil
|
||||
@@ -194,8 +168,6 @@ func CancelAbandonedJobs(ctx context.Context) error {
|
||||
|
||||
now := timeutil.TimeStampNow()
|
||||
|
||||
// Collect one job per run to send workflow run status update
|
||||
updatedRuns := map[int64]*actions_model.ActionRunJob{}
|
||||
updatedJobs := []*actions_model.ActionRunJob{}
|
||||
|
||||
for _, job := range jobs {
|
||||
@@ -211,9 +183,6 @@ func CancelAbandonedJobs(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
updated = n > 0
|
||||
if updated && job.Run.Status.IsDone() {
|
||||
updatedRuns[job.RunID] = job
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Warn("cancel abandoned job %v: %v", job.ID, err)
|
||||
@@ -222,16 +191,13 @@ func CancelAbandonedJobs(ctx context.Context) error {
|
||||
if job.Run == nil || job.Run.Repo == nil {
|
||||
continue // error occurs during loading attributes, the following code that depends on "Run.Repo" will fail, so ignore and skip
|
||||
}
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
if updated {
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
|
||||
}
|
||||
}
|
||||
|
||||
for _, job := range updatedRuns {
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
|
||||
}
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, updatedJobs)
|
||||
EmitJobsIfReadyByJobs(updatedJobs)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -17,15 +17,15 @@ import (
|
||||
)
|
||||
|
||||
// EvaluateRunConcurrencyFillModel evaluates the expressions in a run-level (workflow) concurrency,
|
||||
// and fills the run's model fields with `concurrency.group` and `concurrency.cancel-in-progress`.
|
||||
// and fills the run attempt model with the evaluated `concurrency.group` and `concurrency.cancel-in-progress` values.
|
||||
// Workflow-level concurrency doesn't depend on the job outputs, so it can always be evaluated if there is no syntax error.
|
||||
// See https://docs.github.com/en/actions/reference/workflows-and-actions/workflow-syntax#concurrency
|
||||
func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, wfRawConcurrency *act_model.RawConcurrency, vars map[string]string, inputs map[string]any) error {
|
||||
func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, wfRawConcurrency *act_model.RawConcurrency, vars map[string]string, inputs map[string]any) error {
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
return fmt.Errorf("run LoadAttributes: %w", err)
|
||||
}
|
||||
|
||||
actionsRunCtx := GenerateGiteaContext(run, nil)
|
||||
actionsRunCtx := GenerateGiteaContext(ctx, run, attempt, nil)
|
||||
jobResults := map[string]*jobparser.JobResult{"": {}}
|
||||
if inputs == nil {
|
||||
var err error
|
||||
@@ -35,12 +35,8 @@ func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
}
|
||||
}
|
||||
|
||||
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal raw concurrency: %w", err)
|
||||
}
|
||||
run.RawConcurrency = string(rawConcurrency)
|
||||
run.ConcurrencyGroup, run.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(wfRawConcurrency, "", nil, actionsRunCtx, jobResults, vars, inputs)
|
||||
var err error
|
||||
attempt.ConcurrencyGroup, attempt.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(wfRawConcurrency, "", nil, actionsRunCtx, jobResults, vars, inputs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("evaluate concurrency: %w", err)
|
||||
}
|
||||
@@ -71,7 +67,7 @@ func findJobNeedsAndFillJobResults(ctx context.Context, job *actions_model.Actio
|
||||
// Job-level concurrency may depend on other job's outputs (via `needs`): `concurrency.group: my-group-${{ needs.job1.outputs.out1 }}`
|
||||
// If the needed jobs haven't been executed yet, this evaluation will also fail.
|
||||
// See https://docs.github.com/en/actions/reference/workflows-and-actions/workflow-syntax#jobsjob_idconcurrency
|
||||
func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string, inputs map[string]any) error {
|
||||
func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, actionRunJob *actions_model.ActionRunJob, vars map[string]string, inputs map[string]any) error {
|
||||
if err := actionRunJob.LoadAttributes(ctx); err != nil {
|
||||
return fmt.Errorf("job LoadAttributes: %w", err)
|
||||
}
|
||||
@@ -81,7 +77,7 @@ func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
return fmt.Errorf("unmarshal raw concurrency: %w", err)
|
||||
}
|
||||
|
||||
actionsJobCtx := GenerateGiteaContext(run, actionRunJob)
|
||||
actionsJobCtx := GenerateGiteaContext(ctx, run, attempt, actionRunJob)
|
||||
|
||||
jobResults, err := findJobNeedsAndFillJobResults(ctx, actionRunJob)
|
||||
if err != nil {
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"code.gitea.io/gitea/modules/container"
|
||||
"code.gitea.io/gitea/modules/git"
|
||||
"code.gitea.io/gitea/modules/json"
|
||||
"code.gitea.io/gitea/modules/optional"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
|
||||
@@ -22,9 +23,14 @@ import (
|
||||
|
||||
type GiteaContext map[string]any
|
||||
|
||||
// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token
|
||||
// job can be nil when generating a context for parsing workflow-level expressions
|
||||
func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) GiteaContext {
|
||||
// GenerateGiteaContext generate the gitea context without token and gitea_runtime_token.
|
||||
// attempt and job can be nil when generating a context for parsing workflow-level expressions.
|
||||
//
|
||||
// The run_attempt value is resolved with the following precedence:
|
||||
// 1. attempt.Attempt - the explicit attempt argument, or run.GetLatestAttempt() as a fallback
|
||||
// 2. job.Attempt - only used when neither an explicit nor latest attempt is available
|
||||
// 3. "1" - when none of the above apply (first-run parse time, before the first attempt exists)
|
||||
func GenerateGiteaContext(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, job *actions_model.ActionRunJob) GiteaContext {
|
||||
event := map[string]any{}
|
||||
_ = json.Unmarshal([]byte(run.EventPayload), &event)
|
||||
|
||||
@@ -89,10 +95,28 @@ func GenerateGiteaContext(run *actions_model.ActionRun, job *actions_model.Actio
|
||||
|
||||
if job != nil {
|
||||
gitContext["job"] = job.JobID
|
||||
gitContext["run_id"] = strconv.FormatInt(job.RunID, 10)
|
||||
gitContext["run_attempt"] = strconv.FormatInt(job.Attempt, 10)
|
||||
}
|
||||
|
||||
if attempt == nil {
|
||||
if latestAttempt, has, err := run.GetLatestAttempt(ctx); err == nil && has {
|
||||
attempt = latestAttempt
|
||||
}
|
||||
}
|
||||
|
||||
if attempt != nil {
|
||||
gitContext["run_attempt"] = strconv.FormatInt(attempt.Attempt, 10)
|
||||
if err := attempt.LoadAttributes(ctx); err == nil {
|
||||
gitContext["triggering_actor"] = attempt.TriggerUser.Name
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback for first-run parse time: no job, no attempt (LatestAttemptID==0). github.run_attempt
|
||||
// is 1-based per the documented contract, so emit "1" rather than leaving it empty.
|
||||
if gitContext["run_attempt"] == "" {
|
||||
gitContext["run_attempt"] = "1"
|
||||
}
|
||||
|
||||
return gitContext
|
||||
}
|
||||
|
||||
@@ -108,7 +132,13 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st
|
||||
}
|
||||
needs := container.SetOf(job.Needs...)
|
||||
|
||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: job.RunID})
|
||||
// Scope to the same attempt. For legacy jobs RunAttemptID==0, which matches all other legacy jobs in the same run.
|
||||
findOpts := actions_model.FindRunJobOptions{
|
||||
RunID: job.RunID,
|
||||
RunAttemptID: optional.Some(job.RunAttemptID),
|
||||
}
|
||||
|
||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, findOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FindRunJobs: %w", err)
|
||||
}
|
||||
@@ -125,11 +155,12 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st
|
||||
}
|
||||
var jobOutputs map[string]string
|
||||
for _, job := range jobsWithSameID {
|
||||
if job.TaskID == 0 || !job.Status.IsDone() {
|
||||
// it shouldn't happen, or the job has been rerun
|
||||
taskID := job.EffectiveTaskID()
|
||||
if taskID == 0 || !job.Status.IsDone() {
|
||||
// it shouldn't happen
|
||||
continue
|
||||
}
|
||||
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
|
||||
got, err := actions_model.FindTaskOutputByTaskID(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
|
||||
}
|
||||
|
||||
@@ -26,17 +26,20 @@ func TestEvaluateRunConcurrency_RunIDFallback(t *testing.T) {
|
||||
runA := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 791})
|
||||
runB := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 792})
|
||||
|
||||
attemptA := &actions_model.ActionRunAttempt{RepoID: runA.RepoID, RunID: runA.ID, Attempt: 1}
|
||||
attemptB := &actions_model.ActionRunAttempt{RepoID: runB.RepoID, RunID: runB.ID, Attempt: 1}
|
||||
|
||||
expr := &act_model.RawConcurrency{
|
||||
Group: "${{ github.workflow }}-${{ github.head_ref || github.run_id }}",
|
||||
CancelInProgress: "true",
|
||||
}
|
||||
|
||||
assert.NoError(t, EvaluateRunConcurrencyFillModel(ctx, runA, expr, nil, nil))
|
||||
assert.NoError(t, EvaluateRunConcurrencyFillModel(ctx, runB, expr, nil, nil))
|
||||
assert.NoError(t, EvaluateRunConcurrencyFillModel(ctx, runA, attemptA, expr, nil, nil))
|
||||
assert.NoError(t, EvaluateRunConcurrencyFillModel(ctx, runB, attemptB, expr, nil, nil))
|
||||
|
||||
assert.Contains(t, runA.ConcurrencyGroup, "791")
|
||||
assert.Contains(t, runB.ConcurrencyGroup, "792")
|
||||
assert.NotEqual(t, runA.ConcurrencyGroup, runB.ConcurrencyGroup)
|
||||
assert.Contains(t, attemptA.ConcurrencyGroup, "791")
|
||||
assert.Contains(t, attemptB.ConcurrencyGroup, "792")
|
||||
assert.NotEqual(t, attemptA.ConcurrencyGroup, attemptB.ConcurrencyGroup)
|
||||
}
|
||||
|
||||
func TestPrepareRunAndInsert_ExpressionsSeeRunID(t *testing.T) {
|
||||
@@ -78,7 +81,10 @@ jobs:
|
||||
persisted := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run.ID})
|
||||
runIDStr := strconv.FormatInt(run.ID, 10)
|
||||
assert.Equal(t, "Run "+runIDStr, persisted.Title)
|
||||
assert.Equal(t, "group-"+runIDStr, persisted.ConcurrencyGroup)
|
||||
// ConcurrencyGroup lives on the latest attempt after migration v331.
|
||||
require.Positive(t, persisted.LatestAttemptID)
|
||||
attempt := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunAttempt{ID: persisted.LatestAttemptID})
|
||||
assert.Equal(t, "group-"+runIDStr, attempt.ConcurrencyGroup)
|
||||
// Rerun reads raw_concurrency from the DB to re-evaluate the group;
|
||||
// see services/actions/rerun.go. Must survive the insert.
|
||||
assert.NotEmpty(t, persisted.RawConcurrency)
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"code.gitea.io/gitea/modules/queue"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
|
||||
"xorm.io/builder"
|
||||
)
|
||||
@@ -70,30 +69,33 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("get action run: %w", err)
|
||||
}
|
||||
var jobs, updatedJobs []*actions_model.ActionRunJob
|
||||
var jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
// check jobs of the current run
|
||||
if js, ujs, err := checkJobsOfRun(ctx, run); err != nil {
|
||||
if js, ujs, cjs, err := checkJobsOfCurrentRunAttempt(ctx, run); err != nil {
|
||||
return err
|
||||
} else {
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
}
|
||||
if js, ujs, err := checkRunConcurrency(ctx, run); err != nil {
|
||||
if js, ujs, cjs, err := checkRunConcurrency(ctx, run); err != nil {
|
||||
return err
|
||||
} else {
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
CreateCommitStatusForRunJobs(ctx, run, jobs...)
|
||||
for _, job := range updatedJobs {
|
||||
_ = job.LoadAttributes(ctx)
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledJobs)
|
||||
EmitJobsIfReadyByJobs(cancelledJobs)
|
||||
if err := createCommitStatusesForJobsByRun(ctx, jobs); err != nil {
|
||||
return err
|
||||
}
|
||||
NotifyWorkflowJobsStatusUpdate(ctx, updatedJobs...)
|
||||
runJobs := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
runJobs[job.RunID] = append(runJobs[job.RunID], job)
|
||||
@@ -114,71 +116,97 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
|
||||
}
|
||||
}
|
||||
if runUpdated {
|
||||
NotifyWorkflowRunStatusUpdateWithReload(ctx, js[0])
|
||||
NotifyWorkflowRunStatusUpdateWithReload(ctx, js[0].RepoID, js[0].RunID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// findBlockedRunByConcurrency finds the blocked concurrent run in a repo and returns `nil, nil` when there is no blocked run.
|
||||
func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.ActionRun, error) {
|
||||
if concurrencyGroup == "" {
|
||||
return nil, nil //nolint:nilnil // return nil to indicate that no blocked run exists
|
||||
}
|
||||
cRuns, cJobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
|
||||
func createCommitStatusesForJobsByRun(ctx context.Context, jobs []*actions_model.ActionRunJob) error {
|
||||
runJobs := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
runJobs[job.RunID] = append(runJobs[job.RunID], job)
|
||||
}
|
||||
|
||||
// There can be at most one blocked run or job
|
||||
var concurrentRun *actions_model.ActionRun
|
||||
if len(cRuns) > 0 {
|
||||
concurrentRun = cRuns[0]
|
||||
} else if len(cJobs) > 0 {
|
||||
jobRun, exist, err := db.GetByID[actions_model.ActionRun](ctx, cJobs[0].RunID)
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("run %d does not exist", cJobs[0].RunID)
|
||||
}
|
||||
for jobRunID, jobList := range runJobs {
|
||||
run, err := actions_model.GetRunByRepoAndID(ctx, jobList[0].RepoID, jobRunID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get run by job %d: %w", cJobs[0].ID, err)
|
||||
return fmt.Errorf("get action run %d: %w", jobRunID, err)
|
||||
}
|
||||
concurrentRun = jobRun
|
||||
CreateCommitStatusForRunJobs(ctx, run, jobList...)
|
||||
}
|
||||
|
||||
return concurrentRun, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) {
|
||||
// findBlockedRunIDByConcurrency finds a blocked concurrent run in a repo and returns 0 when there is no blocked run.
|
||||
func findBlockedRunIDByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (int64, error) {
|
||||
if concurrencyGroup == "" {
|
||||
return 0, nil
|
||||
}
|
||||
cAttempts, cJobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("find concurrent runs and jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(cAttempts) > 0 {
|
||||
return cAttempts[0].RunID, nil
|
||||
}
|
||||
if len(cJobs) > 0 {
|
||||
return cJobs[0].RunID, nil
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func checkBlockedConcurrentRun(ctx context.Context, repoID, runID int64) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
concurrentRun, err := actions_model.GetRunByRepoAndID(ctx, repoID, runID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("get run %d: %w", runID, err)
|
||||
}
|
||||
if concurrentRun.NeedApproval {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
return checkJobsOfCurrentRunAttempt(ctx, concurrentRun)
|
||||
}
|
||||
|
||||
// checkRunConcurrency rechecks runs blocked by concurrency that may become unblocked after the current run releases a workflow-level or job-level concurrency group.
|
||||
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
checkedConcurrencyGroup := make(container.Set[string])
|
||||
|
||||
collect := func(concurrencyGroup string) error {
|
||||
concurrentRun, err := findBlockedRunByConcurrency(ctx, run.RepoID, concurrencyGroup)
|
||||
concurrentRunID, err := findBlockedRunIDByConcurrency(ctx, run.RepoID, concurrencyGroup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("find blocked run by concurrency: %w", err)
|
||||
}
|
||||
if concurrentRun != nil && !concurrentRun.NeedApproval {
|
||||
js, ujs, err := checkJobsOfRun(ctx, concurrentRun)
|
||||
if concurrentRunID > 0 {
|
||||
js, ujs, cjs, err := checkBlockedConcurrentRun(ctx, run.RepoID, concurrentRunID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
}
|
||||
checkedConcurrencyGroup.Add(concurrencyGroup)
|
||||
return nil
|
||||
}
|
||||
|
||||
// check run (workflow-level) concurrency
|
||||
if run.ConcurrencyGroup != "" {
|
||||
if err := collect(run.ConcurrencyGroup); err != nil {
|
||||
return nil, nil, err
|
||||
runConcurrencyGroup, _, err := run.GetEffectiveConcurrency(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("GetEffectiveConcurrency: %w", err)
|
||||
}
|
||||
if runConcurrencyGroup != "" {
|
||||
if err := collect(runConcurrencyGroup); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// check job concurrency
|
||||
runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||
runJobs, err := actions_model.GetLatestAttemptJobsByRepoAndRunID(ctx, run.RepoID, run.ID)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
|
||||
return nil, nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
|
||||
}
|
||||
for _, job := range runJobs {
|
||||
if !job.Status.IsDone() {
|
||||
@@ -188,28 +216,30 @@ func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (job
|
||||
continue
|
||||
}
|
||||
if err := collect(job.ConcurrencyGroup); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
return jobs, updatedJobs, nil
|
||||
return jobs, updatedJobs, cancelledJobs, nil
|
||||
}
|
||||
|
||||
func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) {
|
||||
jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||
// checkJobsOfCurrentRunAttempt resolves blocked jobs of the run's latest attempt.
|
||||
func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, run.LatestAttemptID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
resolver := newJobStatusResolver(jobs, vars)
|
||||
|
||||
if err = db.WithTx(ctx, func(ctx context.Context) error {
|
||||
for _, job := range jobs {
|
||||
job.Run = run
|
||||
}
|
||||
|
||||
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
|
||||
updates := resolver.Resolve(ctx)
|
||||
for _, job := range jobs {
|
||||
if status, ok := updates[job.ID]; ok {
|
||||
job.Status = status
|
||||
@@ -223,26 +253,18 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return jobs, updatedJobs, nil
|
||||
}
|
||||
|
||||
func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_model.ActionRunJob) {
|
||||
job.Run = nil
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
log.Error("LoadAttributes: %v", err)
|
||||
return
|
||||
}
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
|
||||
return jobs, updatedJobs, resolver.cancelledJobs, nil
|
||||
}
|
||||
|
||||
type jobStatusResolver struct {
|
||||
statuses map[int64]actions_model.Status
|
||||
needs map[int64][]int64
|
||||
jobMap map[int64]*actions_model.ActionRunJob
|
||||
vars map[string]string
|
||||
statuses map[int64]actions_model.Status
|
||||
needs map[int64][]int64
|
||||
jobMap map[int64]*actions_model.ActionRunJob
|
||||
vars map[string]string
|
||||
cancelledJobs []*actions_model.ActionRunJob
|
||||
}
|
||||
|
||||
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
|
||||
@@ -341,9 +363,12 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
|
||||
newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped)
|
||||
if newStatus == actions_model.StatusWaiting {
|
||||
newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob)
|
||||
var cancelledJobs []*actions_model.ActionRunJob
|
||||
newStatus, cancelledJobs, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob)
|
||||
if err != nil {
|
||||
log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
} else {
|
||||
r.cancelledJobs = append(r.cancelledJobs, cancelledJobs...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,8 +384,16 @@ func updateConcurrencyEvaluationForJobWithNeeds(ctx context.Context, actionRunJo
|
||||
return nil // for testing purpose only, no repo, no evaluation
|
||||
}
|
||||
|
||||
err := EvaluateJobConcurrencyFillModel(ctx, actionRunJob.Run, actionRunJob, vars, nil)
|
||||
if err != nil {
|
||||
// Legacy jobs (created before migration v331) have RunAttemptID=0 and no attempt record.
|
||||
var attempt *actions_model.ActionRunAttempt
|
||||
if actionRunJob.RunAttemptID > 0 {
|
||||
var err error
|
||||
attempt, err = actions_model.GetRunAttemptByRepoAndID(ctx, actionRunJob.RepoID, actionRunJob.RunAttemptID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("GetRunAttemptByRepoAndID: %w", err)
|
||||
}
|
||||
}
|
||||
if err := EvaluateJobConcurrencyFillModel(ctx, actionRunJob.Run, attempt, actionRunJob, vars, nil); err != nil {
|
||||
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -144,23 +144,36 @@ func Test_checkRunConcurrency_NoDuplicateConcurrencyGroupCheck(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := t.Context()
|
||||
|
||||
// Run A: the triggering run with a concurrency group.
|
||||
// Run A: the triggering run of attempt A
|
||||
runA := &actions_model.ActionRun{
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
TriggerUserID: 1,
|
||||
WorkflowID: "test.yml",
|
||||
Index: 9901,
|
||||
Ref: "refs/heads/main",
|
||||
Status: actions_model.StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, runA))
|
||||
|
||||
// Attempt A: an attempt of run A with concurrency group "test-cg"
|
||||
runAAttempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
TriggerUserID: 1,
|
||||
WorkflowID: "test.yml",
|
||||
Index: 9901,
|
||||
Ref: "refs/heads/main",
|
||||
RunID: runA.ID,
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusRunning,
|
||||
ConcurrencyGroup: "test-cg",
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, runA))
|
||||
assert.NoError(t, db.Insert(ctx, runAAttempt))
|
||||
_, err := db.Exec(t.Context(), "UPDATE `action_run` SET latest_attempt_id = ? WHERE id = ?", runAAttempt.ID, runA.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// A done job for run A with the same ConcurrencyGroup.
|
||||
// This triggers the job-level concurrency check in checkRunConcurrency.
|
||||
jobADone := &actions_model.ActionRunJob{
|
||||
RunID: runA.ID,
|
||||
RunAttemptID: runAAttempt.ID,
|
||||
AttemptJobID: 1,
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
JobID: "job1",
|
||||
@@ -170,31 +183,45 @@ func Test_checkRunConcurrency_NoDuplicateConcurrencyGroupCheck(t *testing.T) {
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, jobADone))
|
||||
|
||||
// Blocked run B competing for the same concurrency group.
|
||||
// Run B: a run blocked by concurrency
|
||||
runB := &actions_model.ActionRun{
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
TriggerUserID: 1,
|
||||
WorkflowID: "test.yml",
|
||||
Index: 9902,
|
||||
Ref: "refs/heads/main",
|
||||
Status: actions_model.StatusBlocked,
|
||||
ConcurrencyGroup: "test-cg",
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
TriggerUserID: 1,
|
||||
WorkflowID: "test.yml",
|
||||
Index: 9902,
|
||||
Ref: "refs/heads/main",
|
||||
Status: actions_model.StatusBlocked,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, runB))
|
||||
|
||||
// Attempt B: an blocked attempt of run B
|
||||
runBAttempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: 4,
|
||||
RunID: runB.ID,
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusBlocked,
|
||||
ConcurrencyGroup: "test-cg",
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, runBAttempt))
|
||||
_, err = db.Exec(t.Context(), "UPDATE `action_run` SET latest_attempt_id = ? WHERE id = ?", runBAttempt.ID, runB.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// A blocked job belonging to run B (no job-level concurrency group).
|
||||
jobBBlocked := &actions_model.ActionRunJob{
|
||||
RunID: runB.ID,
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
JobID: "job1",
|
||||
Name: "job1",
|
||||
Status: actions_model.StatusBlocked,
|
||||
RunID: runB.ID,
|
||||
RunAttemptID: runBAttempt.ID,
|
||||
AttemptJobID: 1,
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
JobID: "job1",
|
||||
Name: "job1",
|
||||
Status: actions_model.StatusBlocked,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, jobBBlocked))
|
||||
|
||||
jobs, _, err := checkRunConcurrency(ctx, runA)
|
||||
runA, _, _ = db.GetByID[actions_model.ActionRun](t.Context(), runA.ID)
|
||||
jobs, _, _, err := checkRunConcurrency(ctx, runA)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if assert.Len(t, jobs, 1) {
|
||||
|
||||
@@ -815,7 +815,7 @@ func (n *actionsNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *rep
|
||||
log.Error("GetActionWorkflow: %v", err)
|
||||
return
|
||||
}
|
||||
convertedRun, err := convert.ToActionWorkflowRun(ctx, repo, run)
|
||||
convertedRun, err := convert.ToActionWorkflowRun(ctx, repo, run, nil)
|
||||
if err != nil {
|
||||
log.Error("ToActionWorkflowRun: %v", err)
|
||||
return
|
||||
|
||||
144
services/actions/notify.go
Normal file
144
services/actions/notify.go
Normal file
@@ -0,0 +1,144 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
)
|
||||
|
||||
// NotifyWorkflowJobsAndRunsStatusUpdate notifies status changes for a batch of jobs and the runs they affect.
|
||||
// Use it when a workflow operation updates multiple jobs and runs.
|
||||
func NotifyWorkflowJobsAndRunsStatusUpdate(ctx context.Context, jobs []*actions_model.ActionRunJob) {
|
||||
if len(jobs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// The input jobs may belong to different runs, so track each affected run.
|
||||
runs := make(map[int64]*actions_model.ActionRun, len(jobs))
|
||||
jobsByRunID := make(map[int64][]*actions_model.ActionRunJob)
|
||||
|
||||
for _, job := range jobs {
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
log.Error("Failed to load job attributes: %v", err)
|
||||
continue
|
||||
}
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
|
||||
if _, ok := runs[job.RunID]; !ok {
|
||||
runs[job.RunID] = job.Run
|
||||
}
|
||||
if _, ok := jobsByRunID[job.RunID]; !ok {
|
||||
jobsByRunID[job.RunID] = make([]*actions_model.ActionRunJob, 0)
|
||||
}
|
||||
jobsByRunID[job.RunID] = append(jobsByRunID[job.RunID], job)
|
||||
}
|
||||
|
||||
for _, run := range runs {
|
||||
NotifyWorkflowRunStatusUpdate(ctx, run)
|
||||
}
|
||||
|
||||
for _, jobs := range jobsByRunID {
|
||||
NotifyWorkflowJobsStatusUpdate(ctx, jobs...)
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyWorkflowRunStatusUpdateWithReload reloads the run before notifying its status update.
|
||||
// Use it when only repo/run IDs are available or when the in-memory run may be stale after job updates.
|
||||
func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, repoID, runID int64) {
|
||||
run, err := actions_model.GetRunByRepoAndID(ctx, repoID, runID)
|
||||
if err != nil {
|
||||
log.Error("GetRunByRepoAndID: %v", err)
|
||||
return
|
||||
}
|
||||
NotifyWorkflowRunStatusUpdate(ctx, run)
|
||||
}
|
||||
|
||||
// NotifyWorkflowRunStatusUpdate notifies a run status update using the latest attempt trigger user when available.
|
||||
// Use it for run-level notifications when the caller already has the run model loaded.
|
||||
func NotifyWorkflowRunStatusUpdate(ctx context.Context, run *actions_model.ActionRun) {
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
log.Error("run.LoadAttributes: %v", err)
|
||||
return
|
||||
}
|
||||
triggerUser := run.TriggerUser
|
||||
if run.LatestAttemptID > 0 {
|
||||
attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID)
|
||||
if err != nil {
|
||||
log.Error("GetRunAttemptByRepoAndID: %v", err)
|
||||
return
|
||||
}
|
||||
if err := attempt.LoadAttributes(ctx); err != nil {
|
||||
log.Error("attempt.LoadAttributes: %v", err)
|
||||
return
|
||||
}
|
||||
triggerUser = attempt.TriggerUser
|
||||
}
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, triggerUser, run)
|
||||
}
|
||||
|
||||
// NotifyWorkflowJobsStatusUpdate notifies status updates for jobs without task.
|
||||
// Use it for batch or single-job notifications after state changes.
|
||||
func NotifyWorkflowJobsStatusUpdate(ctx context.Context, jobs ...*actions_model.ActionRunJob) {
|
||||
jobsByAttempt := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
if _, ok := jobsByAttempt[job.RunAttemptID]; !ok {
|
||||
jobsByAttempt[job.RunAttemptID] = make([]*actions_model.ActionRunJob, 0)
|
||||
}
|
||||
jobsByAttempt[job.RunAttemptID] = append(jobsByAttempt[job.RunAttemptID], job)
|
||||
}
|
||||
|
||||
for attemptID, js := range jobsByAttempt {
|
||||
if attemptID == 0 {
|
||||
for _, job := range js {
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
log.Error("job.LoadAttributes: %v", err)
|
||||
continue
|
||||
}
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, js[0].RepoID, attemptID)
|
||||
if err != nil {
|
||||
log.Error("GetRunAttemptByRepoAndID: %v", err)
|
||||
continue
|
||||
}
|
||||
if err := attempt.LoadAttributes(ctx); err != nil {
|
||||
log.Error("attempt.LoadAttributes: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, job := range js {
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, attempt.Run.Repo, attempt.TriggerUser, job, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyWorkflowJobStatusUpdateWithTask notifies a single job status update when a concrete task is available.
|
||||
// Use it for runner/task lifecycle callbacks so the notification includes the originating task context.
|
||||
func NotifyWorkflowJobStatusUpdateWithTask(ctx context.Context, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
|
||||
if job.RunAttemptID == 0 {
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
log.Error("job.LoadAttributes: %v", err)
|
||||
return
|
||||
}
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, task)
|
||||
return
|
||||
}
|
||||
|
||||
attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, job.RepoID, job.RunAttemptID)
|
||||
if err != nil {
|
||||
log.Error("GetRunAttemptByRepoAndID: %v", err)
|
||||
return
|
||||
}
|
||||
if err := attempt.LoadAttributes(ctx); err != nil {
|
||||
log.Error("attempt.LoadAttributes: %v", err)
|
||||
return
|
||||
}
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, attempt.Run.Repo, attempt.TriggerUser, job, task)
|
||||
}
|
||||
@@ -6,57 +6,312 @@ package actions
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
repo_model "code.gitea.io/gitea/models/repo"
|
||||
"code.gitea.io/gitea/models/unit"
|
||||
user_model "code.gitea.io/gitea/models/user"
|
||||
"code.gitea.io/gitea/modules/container"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
|
||||
"github.com/nektos/act/pkg/model"
|
||||
"go.yaml.in/yaml/v4"
|
||||
"xorm.io/builder"
|
||||
)
|
||||
|
||||
// GetFailedRerunJobs returns all failed jobs and their downstream dependent jobs that need to be rerun
|
||||
func GetFailedRerunJobs(allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob {
|
||||
rerunJobIDSet := make(container.Set[int64])
|
||||
// GetFailedJobsForRerun returns the failed or cancelled jobs in a run.
|
||||
func GetFailedJobsForRerun(allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob {
|
||||
var jobsToRerun []*actions_model.ActionRunJob
|
||||
|
||||
for _, job := range allJobs {
|
||||
if job.Status == actions_model.StatusFailure || job.Status == actions_model.StatusCancelled {
|
||||
for _, j := range GetAllRerunJobs(job, allJobs) {
|
||||
if !rerunJobIDSet.Contains(j.ID) {
|
||||
rerunJobIDSet.Add(j.ID)
|
||||
jobsToRerun = append(jobsToRerun, j)
|
||||
}
|
||||
}
|
||||
jobsToRerun = append(jobsToRerun, job)
|
||||
}
|
||||
}
|
||||
|
||||
return jobsToRerun
|
||||
}
|
||||
|
||||
// GetAllRerunJobs returns the target job and all jobs that transitively depend on it.
|
||||
// Downstream jobs are included regardless of their current status.
|
||||
func GetAllRerunJobs(job *actions_model.ActionRunJob, allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob {
|
||||
rerunJobs := []*actions_model.ActionRunJob{job}
|
||||
rerunJobsIDSet := make(container.Set[string])
|
||||
rerunJobsIDSet.Add(job.JobID)
|
||||
// RerunWorkflowRunJobs reruns the given jobs of a workflow run.
|
||||
// An empty jobsToRerun means rerunning the whole run. Otherwise jobsToRerun contains only the user-requested target jobs;
|
||||
// downstream dependent jobs are expanded internally while building the rerun plan.
|
||||
//
|
||||
// The three stages below (legacy backfill, plan build, plan exec) deliberately run in separate DB transactions
|
||||
// rather than one big outer transaction:
|
||||
// - execRerunPlan performs slow work (loading variables, YAML unmarshal, concurrency expression evaluation)
|
||||
// before opening its own transaction, so the tx stays focused on inserts/updates.
|
||||
// - The legacy backfill is idempotent-friendly: if it succeeds but a later stage fails, a subsequent rerun
|
||||
// will observe run.LatestAttemptID != 0 and skip the backfill, continuing naturally. No data corruption
|
||||
// or stuck state results from partial progress.
|
||||
//
|
||||
// Fast validations that can catch failures early (workflow disabled, run not done, etc.) are therefore
|
||||
// pushed into validateRerun so we rarely enter createOriginalAttemptForLegacyRun only to fail afterwards.
|
||||
func RerunWorkflowRunJobs(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, triggerUser *user_model.User, jobsToRerun []*actions_model.ActionRunJob) (*actions_model.ActionRunAttempt, error) {
|
||||
if err := validateRerun(ctx, run, repo, triggerUser, jobsToRerun); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if run.LatestAttemptID == 0 {
|
||||
if err := createOriginalAttemptForLegacyRun(ctx, run); err != nil {
|
||||
return nil, fmt.Errorf("create attempt for legacy run: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
plan, err := buildRerunPlan(ctx, run, triggerUser, jobsToRerun)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return execRerunPlan(ctx, plan)
|
||||
}
|
||||
|
||||
func validateRerun(ctx context.Context, run *actions_model.ActionRun, repo *repo_model.Repository, triggerUser *user_model.User, jobsToRerun []*actions_model.ActionRunJob) error {
|
||||
if !run.Status.IsDone() {
|
||||
return util.NewInvalidArgumentErrorf("this workflow run is not done")
|
||||
}
|
||||
if repo == nil {
|
||||
return util.NewInvalidArgumentErrorf("repo is required")
|
||||
}
|
||||
if run.RepoID != repo.ID {
|
||||
return util.NewInvalidArgumentErrorf("run %d does not belong to repo %d", run.ID, repo.ID)
|
||||
}
|
||||
for _, job := range jobsToRerun {
|
||||
if job.RunID != run.ID {
|
||||
return util.NewInvalidArgumentErrorf("job %d does not belong to workflow run %d", job.ID, run.ID)
|
||||
}
|
||||
}
|
||||
if triggerUser == nil {
|
||||
return util.NewInvalidArgumentErrorf("trigger user is required")
|
||||
}
|
||||
cfgUnit := repo.MustGetUnit(ctx, unit.TypeActions)
|
||||
cfg := cfgUnit.ActionsConfig()
|
||||
if cfg.IsWorkflowDisabled(run.WorkflowID) {
|
||||
return util.NewInvalidArgumentErrorf("workflow %s is disabled", run.WorkflowID)
|
||||
}
|
||||
|
||||
// Legacy runs (LatestAttemptID == 0) conceptually have only attempt 1, so they can never be at the cap.
|
||||
// For non-legacy runs, look up the latest attempt and reject when its number is already at the configured cap.
|
||||
if run.LatestAttemptID > 0 {
|
||||
latestAttempt, has, err := run.GetLatestAttempt(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("GetLatestAttempt: %w", err)
|
||||
}
|
||||
if has && latestAttempt.Attempt >= setting.Actions.MaxRerunAttempts {
|
||||
return util.NewInvalidArgumentErrorf("workflow run has reached the maximum of %d attempts", setting.Actions.MaxRerunAttempts)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rerunPlan is a read-only snapshot of the inputs needed to execute a rerun.
|
||||
// It holds no to-be-persisted entities and no intermediate evaluation results;
|
||||
// execRerunPlan constructs and evaluates the new ActionRunAttempt itself.
|
||||
type rerunPlan struct {
|
||||
run *actions_model.ActionRun
|
||||
templateAttempt *actions_model.ActionRunAttempt
|
||||
templateJobs actions_model.ActionJobList
|
||||
rerunJobIDs container.Set[string]
|
||||
triggerUser *user_model.User
|
||||
}
|
||||
|
||||
// buildRerunPlan constructs a rerunPlan for the given workflow run without writing to the database.
|
||||
// jobsToRerun contains only the user-requested target jobs. An empty jobsToRerun means the entire run should be rerun.
|
||||
// It loads the latest attempt as a template and expands jobsToRerun to include all transitive downstream dependents.
|
||||
// The construction of new-attempt and concurrency evaluation are deferred to execRerunPlan so that the plan remains a pure input snapshot.
|
||||
func buildRerunPlan(ctx context.Context, run *actions_model.ActionRun, triggerUser *user_model.User, jobsToRerun []*actions_model.ActionRunJob) (*rerunPlan, error) {
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
templateAttempt, hasTemplateAttempt, err := run.GetLatestAttempt(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !hasTemplateAttempt {
|
||||
return nil, util.NewNotExistErrorf("latest attempt not found")
|
||||
}
|
||||
|
||||
templateJobs, err := actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, templateAttempt.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load template jobs: %w", err)
|
||||
}
|
||||
if len(templateJobs) == 0 {
|
||||
return nil, util.NewNotExistErrorf("no template jobs")
|
||||
}
|
||||
|
||||
plan := &rerunPlan{
|
||||
run: run,
|
||||
templateAttempt: templateAttempt,
|
||||
templateJobs: templateJobs,
|
||||
triggerUser: triggerUser,
|
||||
}
|
||||
|
||||
if err := plan.expandRerunJobIDs(jobsToRerun); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return plan, nil
|
||||
}
|
||||
|
||||
// execRerunPlan executes the rerun plan built by buildRerunPlan.
|
||||
// It loads run variables, constructs the new ActionRunAttempt and evaluates run-level concurrency (all outside the transaction to keep the tx short).
|
||||
// Inside a single database transaction it then inserts the new attempt, clones all template jobs, evaluates job-level concurrency for rerun jobs,
|
||||
// and updates the run's latest_attempt_id.
|
||||
// Jobs not in the rerun set are cloned as pass-through: their status is preserved and SourceTaskID points to the original task so the UI can still display their results.
|
||||
// The attempt's final status is derived only from the rerun jobs, not the pass-through jobs.
|
||||
// Notifications and commit statuses are sent after the transaction commits.
|
||||
func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionRunAttempt, error) {
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, plan.run)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get run %d variables: %w", plan.run.ID, err)
|
||||
}
|
||||
|
||||
newAttempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: plan.run.RepoID,
|
||||
RunID: plan.run.ID,
|
||||
Attempt: plan.templateAttempt.Attempt + 1,
|
||||
TriggerUserID: plan.triggerUser.ID,
|
||||
Status: actions_model.StatusWaiting,
|
||||
}
|
||||
|
||||
if plan.run.RawConcurrency != "" {
|
||||
var rawConcurrency model.RawConcurrency
|
||||
if err := yaml.Unmarshal([]byte(plan.run.RawConcurrency), &rawConcurrency); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal raw concurrency: %w", err)
|
||||
}
|
||||
if err := EvaluateRunConcurrencyFillModel(ctx, plan.run, newAttempt, &rawConcurrency, vars, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var newJobs, newJobsToRerun actions_model.ActionJobList
|
||||
var cancelledConcurrencyJobs []*actions_model.ActionRunJob
|
||||
|
||||
err = db.WithTx(ctx, func(ctx context.Context) error {
|
||||
newAttemptStatus, jobsToCancel, err := PrepareToStartRunWithConcurrency(ctx, newAttempt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
newAttempt.Status = newAttemptStatus
|
||||
shouldBlock := newAttemptStatus == actions_model.StatusBlocked
|
||||
|
||||
if err := db.Insert(ctx, newAttempt); err != nil {
|
||||
if _, getErr := actions_model.GetRunAttemptByRunIDAndAttemptNum(ctx, plan.run.ID, newAttempt.Attempt); getErr == nil {
|
||||
return util.NewAlreadyExistErrorf("workflow run attempt %d for run %d already exists", newAttempt.Attempt, plan.run.ID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
plan.run.LatestAttemptID = newAttempt.ID
|
||||
if err := actions_model.UpdateRun(ctx, plan.run, "latest_attempt_id"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hasWaitingJobs := false
|
||||
newJobs = make(actions_model.ActionJobList, 0, len(plan.templateJobs))
|
||||
newJobsToRerun = make(actions_model.ActionJobList, 0, len(plan.rerunJobIDs))
|
||||
for _, templateJob := range plan.templateJobs {
|
||||
newJob := cloneRunJobForAttempt(templateJob, newAttempt)
|
||||
if plan.rerunJobIDs.Contains(templateJob.JobID) {
|
||||
shouldBlockJob := shouldBlock || plan.hasRerunDependency(templateJob)
|
||||
|
||||
newJob.Status = util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting)
|
||||
newJob.TaskID = 0
|
||||
newJob.SourceTaskID = 0
|
||||
newJob.Started = 0
|
||||
newJob.Stopped = 0
|
||||
newJob.ConcurrencyGroup = ""
|
||||
newJob.ConcurrencyCancel = false
|
||||
newJob.IsConcurrencyEvaluated = false
|
||||
|
||||
if newJob.RawConcurrency != "" && !shouldBlockJob {
|
||||
if err := EvaluateJobConcurrencyFillModel(ctx, plan.run, newAttempt, newJob, vars, nil); err != nil {
|
||||
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||
}
|
||||
newJob.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, newJob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("prepare to start job with concurrency: %w", err)
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
}
|
||||
|
||||
newJobsToRerun = append(newJobsToRerun, newJob)
|
||||
} else {
|
||||
newJob.TaskID = 0
|
||||
newJob.SourceTaskID = templateJob.EffectiveTaskID()
|
||||
newJob.Started = templateJob.Started
|
||||
newJob.Stopped = templateJob.Stopped
|
||||
}
|
||||
|
||||
if err := db.Insert(ctx, newJob); err != nil {
|
||||
return err
|
||||
}
|
||||
hasWaitingJobs = hasWaitingJobs || newJob.Status == actions_model.StatusWaiting
|
||||
newJobs = append(newJobs, newJob)
|
||||
}
|
||||
|
||||
newAttempt.Status = actions_model.AggregateJobStatus(newJobsToRerun)
|
||||
if err := actions_model.UpdateRunAttempt(ctx, newAttempt, "status"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if hasWaitingJobs {
|
||||
if err := actions_model.IncreaseTaskVersion(ctx, plan.run.OwnerID, plan.run.RepoID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := plan.run.LoadAttributes(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledConcurrencyJobs)
|
||||
EmitJobsIfReadyByJobs(cancelledConcurrencyJobs)
|
||||
|
||||
CreateCommitStatusForRunJobs(ctx, plan.run, newJobs...)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, newJobsToRerun)
|
||||
|
||||
return newAttempt, nil
|
||||
}
|
||||
|
||||
func (p *rerunPlan) expandRerunJobIDs(jobsToRerun []*actions_model.ActionRunJob) error {
|
||||
templateJobIDs := make(container.Set[string])
|
||||
for _, job := range p.templateJobs {
|
||||
templateJobIDs.Add(job.JobID)
|
||||
}
|
||||
|
||||
if len(jobsToRerun) == 0 {
|
||||
p.rerunJobIDs = templateJobIDs
|
||||
return nil
|
||||
}
|
||||
|
||||
rerunJobIDs := make(container.Set[string])
|
||||
for _, job := range jobsToRerun {
|
||||
if !templateJobIDs.Contains(job.JobID) {
|
||||
return util.NewInvalidArgumentErrorf("job %q does not exist in the latest attempt", job.JobID)
|
||||
}
|
||||
rerunJobIDs.Add(job.JobID)
|
||||
}
|
||||
|
||||
for {
|
||||
found := false
|
||||
for _, j := range allJobs {
|
||||
if rerunJobsIDSet.Contains(j.JobID) {
|
||||
for _, job := range p.templateJobs {
|
||||
if rerunJobIDs.Contains(job.JobID) {
|
||||
continue
|
||||
}
|
||||
for _, need := range j.Needs {
|
||||
if rerunJobsIDSet.Contains(need) {
|
||||
for _, need := range job.Needs {
|
||||
if rerunJobIDs.Contains(need) {
|
||||
found = true
|
||||
rerunJobs = append(rerunJobs, j)
|
||||
rerunJobsIDSet.Add(j.JobID)
|
||||
rerunJobIDs.Add(job.JobID)
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -66,152 +321,100 @@ func GetAllRerunJobs(job *actions_model.ActionRunJob, allJobs []*actions_model.A
|
||||
}
|
||||
}
|
||||
|
||||
return rerunJobs
|
||||
p.rerunJobIDs = rerunJobIDs
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepareRunRerun validates the run, resets its state, handles concurrency, persists the
|
||||
// updated run, and fires a status-update notification.
|
||||
// It returns isRunBlocked (true when the run itself is held by a concurrency group).
|
||||
func prepareRunRerun(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob) (isRunBlocked bool, err error) {
|
||||
if !run.Status.IsDone() {
|
||||
return false, util.NewInvalidArgumentErrorf("this workflow run is not done")
|
||||
}
|
||||
|
||||
cfgUnit := repo.MustGetUnit(ctx, unit.TypeActions)
|
||||
|
||||
// Rerun is not allowed when workflow is disabled.
|
||||
cfg := cfgUnit.ActionsConfig()
|
||||
if cfg.IsWorkflowDisabled(run.WorkflowID) {
|
||||
return false, util.NewInvalidArgumentErrorf("workflow %s is disabled", run.WorkflowID)
|
||||
}
|
||||
|
||||
// Reset run's timestamps and status.
|
||||
run.PreviousDuration = run.Duration()
|
||||
run.Started = 0
|
||||
run.Stopped = 0
|
||||
run.Status = actions_model.StatusWaiting
|
||||
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("get run %d variables: %w", run.ID, err)
|
||||
}
|
||||
|
||||
if run.RawConcurrency != "" {
|
||||
var rawConcurrency model.RawConcurrency
|
||||
if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil {
|
||||
return false, fmt.Errorf("unmarshal raw concurrency: %w", err)
|
||||
func (p *rerunPlan) hasRerunDependency(job *actions_model.ActionRunJob) bool {
|
||||
for _, need := range job.Needs {
|
||||
if p.rerunJobIDs.Contains(need) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if err := EvaluateRunConcurrencyFillModel(ctx, run, &rawConcurrency, vars, nil); err != nil {
|
||||
return false, err
|
||||
}
|
||||
func cloneRunJobForAttempt(templateJob *actions_model.ActionRunJob, attempt *actions_model.ActionRunAttempt) *actions_model.ActionRunJob {
|
||||
return &actions_model.ActionRunJob{
|
||||
RunID: templateJob.RunID,
|
||||
RunAttemptID: attempt.ID,
|
||||
RepoID: templateJob.RepoID,
|
||||
OwnerID: templateJob.OwnerID,
|
||||
CommitSHA: templateJob.CommitSHA,
|
||||
IsForkPullRequest: templateJob.IsForkPullRequest,
|
||||
Name: templateJob.Name,
|
||||
Attempt: attempt.Attempt,
|
||||
WorkflowPayload: slices.Clone(templateJob.WorkflowPayload),
|
||||
JobID: templateJob.JobID,
|
||||
AttemptJobID: templateJob.AttemptJobID,
|
||||
Needs: slices.Clone(templateJob.Needs),
|
||||
RunsOn: slices.Clone(templateJob.RunsOn),
|
||||
Status: templateJob.Status,
|
||||
RawConcurrency: templateJob.RawConcurrency,
|
||||
IsConcurrencyEvaluated: templateJob.IsConcurrencyEvaluated,
|
||||
ConcurrencyGroup: templateJob.ConcurrencyGroup,
|
||||
ConcurrencyCancel: templateJob.ConcurrencyCancel,
|
||||
TokenPermissions: templateJob.TokenPermissions,
|
||||
}
|
||||
}
|
||||
|
||||
run.Status, err = PrepareToStartRunWithConcurrency(ctx, run)
|
||||
// createOriginalAttemptForLegacyRun creates a real attempt=1 for a legacy run and updates the existing legacy jobs and artifacts in place
|
||||
// so the original execution becomes attempt-aware before the rerun plan is built and all subsequent logic can use real attempts.
|
||||
// Tasks are not modified: they reference jobs by JobID, so updating jobs implicitly carries the new attempt linkage.
|
||||
func createOriginalAttemptForLegacyRun(ctx context.Context, run *actions_model.ActionRun) error {
|
||||
return db.WithTx(ctx, func(ctx context.Context) error {
|
||||
jobs, err := actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, 0)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return fmt.Errorf("load legacy run jobs: %w", err)
|
||||
}
|
||||
if len(jobs) == 0 {
|
||||
return fmt.Errorf("run %d has no jobs", run.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil {
|
||||
return false, err
|
||||
}
|
||||
originalAttempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: run.RepoID,
|
||||
RunID: run.ID,
|
||||
Attempt: 1,
|
||||
TriggerUserID: run.TriggerUserID,
|
||||
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Legacy concurrency fields on ActionRun are intentionally NOT backfilled onto this original attempt.
|
||||
// They only matter while a run is actively being scheduled, and backfilling them for completed legacy runs
|
||||
// would add migration/runtime cost without changing any future concurrency behavior.
|
||||
|
||||
for _, job := range jobs {
|
||||
job.Run = run
|
||||
}
|
||||
Status: run.Status,
|
||||
Created: run.Created,
|
||||
Started: run.Started,
|
||||
Stopped: run.Stopped,
|
||||
}
|
||||
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
|
||||
// Use NoAutoTime so xorm does not overwrite Created with the current time on insert.
|
||||
if _, err := db.GetEngine(ctx).NoAutoTime().Insert(originalAttempt); err != nil {
|
||||
if _, getErr := actions_model.GetRunAttemptByRunIDAndAttemptNum(ctx, run.ID, originalAttempt.Attempt); getErr == nil {
|
||||
return util.NewAlreadyExistErrorf("workflow run attempt %d for run %d already exists", originalAttempt.Attempt, run.ID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return run.Status == actions_model.StatusBlocked, nil
|
||||
}
|
||||
|
||||
// RerunWorkflowRunJobs reruns the given jobs of a workflow run.
|
||||
// jobsToRerun must include all jobs to be rerun (the target job and its transitively dependent jobs).
|
||||
// A job is blocked (waiting for dependencies) if the run itself is blocked or if any of its
|
||||
// needs are also being rerun.
|
||||
func RerunWorkflowRunJobs(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, jobsToRerun []*actions_model.ActionRunJob) error {
|
||||
if len(jobsToRerun) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
isRunBlocked, err := prepareRunRerun(ctx, repo, run, jobsToRerun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rerunJobIDs := make(container.Set[string])
|
||||
for _, j := range jobsToRerun {
|
||||
rerunJobIDs.Add(j.JobID)
|
||||
}
|
||||
|
||||
for _, job := range jobsToRerun {
|
||||
shouldBlockJob := isRunBlocked
|
||||
if !shouldBlockJob {
|
||||
for _, need := range job.Needs {
|
||||
if rerunJobIDs.Contains(need) {
|
||||
shouldBlockJob = true
|
||||
break
|
||||
}
|
||||
// backfill attempt related fields for jobs
|
||||
for i, job := range jobs {
|
||||
job.RunAttemptID = originalAttempt.ID
|
||||
job.Attempt = originalAttempt.Attempt
|
||||
job.AttemptJobID = int64(i + 1)
|
||||
if _, err := db.GetEngine(ctx).ID(job.ID).Cols("run_attempt_id", "attempt", "attempt_job_id").Update(job); err != nil {
|
||||
return fmt.Errorf("backfill legacy run jobs: %w", err)
|
||||
}
|
||||
}
|
||||
if err := rerunWorkflowJob(ctx, job, shouldBlockJob); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func rerunWorkflowJob(ctx context.Context, job *actions_model.ActionRunJob, shouldBlock bool) error {
|
||||
status := job.Status
|
||||
if !status.IsDone() {
|
||||
return nil
|
||||
}
|
||||
|
||||
job.TaskID = 0
|
||||
job.Status = util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting)
|
||||
job.Started = 0
|
||||
job.Stopped = 0
|
||||
job.ConcurrencyGroup = ""
|
||||
job.ConcurrencyCancel = false
|
||||
job.IsConcurrencyEvaluated = false
|
||||
|
||||
if err := job.LoadRun(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := job.Run.LoadAttributes(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, job.Run)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get run %d variables: %w", job.Run.ID, err)
|
||||
}
|
||||
|
||||
if job.RawConcurrency != "" && !shouldBlock {
|
||||
if err := EvaluateJobConcurrencyFillModel(ctx, job.Run, job, vars, nil); err != nil {
|
||||
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||
}
|
||||
|
||||
job.Status, err = PrepareToStartJobWithConcurrency(ctx, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"}
|
||||
_, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...)
|
||||
return err
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
|
||||
return nil
|
||||
// backfill "run_attempt_id" field for artifacts
|
||||
if _, err := db.GetEngine(ctx).
|
||||
Where("run_id=? AND run_attempt_id=0", run.ID).
|
||||
Cols("run_attempt_id").
|
||||
Update(&actions_model.ActionArtifact{RunAttemptID: originalAttempt.ID}); err != nil {
|
||||
return fmt.Errorf("backfill legacy artifacts: %w", err)
|
||||
}
|
||||
|
||||
// update "latest_attempt_id" for the run
|
||||
run.LatestAttemptID = originalAttempt.ID
|
||||
return actions_model.UpdateRun(ctx, run, "latest_attempt_id")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,54 +4,17 @@
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
user_model "code.gitea.io/gitea/models/user"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetAllRerunJobs(t *testing.T) {
|
||||
job1 := &actions_model.ActionRunJob{JobID: "job1"}
|
||||
job2 := &actions_model.ActionRunJob{JobID: "job2", Needs: []string{"job1"}}
|
||||
job3 := &actions_model.ActionRunJob{JobID: "job3", Needs: []string{"job2"}}
|
||||
job4 := &actions_model.ActionRunJob{JobID: "job4", Needs: []string{"job2", "job3"}}
|
||||
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4}
|
||||
|
||||
testCases := []struct {
|
||||
job *actions_model.ActionRunJob
|
||||
rerunJobs []*actions_model.ActionRunJob
|
||||
}{
|
||||
{
|
||||
job1,
|
||||
[]*actions_model.ActionRunJob{job1, job2, job3, job4},
|
||||
},
|
||||
{
|
||||
job2,
|
||||
[]*actions_model.ActionRunJob{job2, job3, job4},
|
||||
},
|
||||
{
|
||||
job3,
|
||||
[]*actions_model.ActionRunJob{job3, job4},
|
||||
},
|
||||
{
|
||||
job4,
|
||||
[]*actions_model.ActionRunJob{job4},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
rerunJobs := GetAllRerunJobs(tc.job, jobs)
|
||||
assert.ElementsMatch(t, tc.rerunJobs, rerunJobs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetFailedRerunJobs(t *testing.T) {
|
||||
// IDs must be non-zero to distinguish jobs in the dedup set.
|
||||
func TestGetFailedJobsForRerun(t *testing.T) {
|
||||
makeJob := func(id int64, jobID string, status actions_model.Status, needs ...string) *actions_model.ActionRunJob {
|
||||
return &actions_model.ActionRunJob{ID: id, JobID: jobID, Status: status, Needs: needs}
|
||||
}
|
||||
@@ -61,7 +24,7 @@ func TestGetFailedRerunJobs(t *testing.T) {
|
||||
makeJob(1, "job1", actions_model.StatusSuccess),
|
||||
makeJob(2, "job2", actions_model.StatusSkipped, "job1"),
|
||||
}
|
||||
assert.Empty(t, GetFailedRerunJobs(jobs))
|
||||
assert.Empty(t, GetFailedJobsForRerun(jobs))
|
||||
})
|
||||
|
||||
t.Run("single failed job with no dependents", func(t *testing.T) {
|
||||
@@ -69,56 +32,50 @@ func TestGetFailedRerunJobs(t *testing.T) {
|
||||
job2 := makeJob(2, "job2", actions_model.StatusSuccess)
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2}
|
||||
|
||||
result := GetFailedRerunJobs(jobs)
|
||||
result := GetFailedJobsForRerun(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result)
|
||||
})
|
||||
|
||||
t.Run("failed job pulls in downstream dependents", func(t *testing.T) {
|
||||
// job1 failed; job2 depends on job1 (skipped); job3 depends on job2 (skipped)
|
||||
t.Run("failed job does not pull in downstream dependents", func(t *testing.T) {
|
||||
job1 := makeJob(1, "job1", actions_model.StatusFailure)
|
||||
job2 := makeJob(2, "job2", actions_model.StatusSkipped, "job1")
|
||||
job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job2")
|
||||
job4 := makeJob(4, "job4", actions_model.StatusSuccess) // unrelated, must not appear
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4}
|
||||
|
||||
result := GetFailedRerunJobs(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3}, result)
|
||||
result := GetFailedJobsForRerun(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result)
|
||||
})
|
||||
|
||||
t.Run("multiple independent failed jobs each pull in their own dependents", func(t *testing.T) {
|
||||
// job1 failed -> job3 depends on job1
|
||||
// job2 failed -> job4 depends on job2
|
||||
t.Run("multiple failed jobs are returned directly", func(t *testing.T) {
|
||||
job1 := makeJob(1, "job1", actions_model.StatusFailure)
|
||||
job2 := makeJob(2, "job2", actions_model.StatusFailure)
|
||||
job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job1")
|
||||
job4 := makeJob(4, "job4", actions_model.StatusSkipped, "job2")
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2, job3, job4}
|
||||
|
||||
result := GetFailedRerunJobs(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3, job4}, result)
|
||||
result := GetFailedJobsForRerun(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result)
|
||||
})
|
||||
|
||||
t.Run("shared downstream dependent is not duplicated", func(t *testing.T) {
|
||||
// job1 and job2 both failed; job3 depends on both
|
||||
t.Run("shared downstream dependent is not included", func(t *testing.T) {
|
||||
job1 := makeJob(1, "job1", actions_model.StatusFailure)
|
||||
job2 := makeJob(2, "job2", actions_model.StatusFailure)
|
||||
job3 := makeJob(3, "job3", actions_model.StatusSkipped, "job1", "job2")
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2, job3}
|
||||
|
||||
result := GetFailedRerunJobs(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2, job3}, result)
|
||||
assert.Len(t, result, 3) // job3 must appear exactly once
|
||||
result := GetFailedJobsForRerun(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result)
|
||||
assert.Len(t, result, 2)
|
||||
})
|
||||
|
||||
t.Run("successful downstream job of a failed job is still included", func(t *testing.T) {
|
||||
// job1 failed; job2 succeeded but depends on job1 — downstream is always rerun
|
||||
// regardless of its own status (GetAllRerunJobs includes all transitive dependents)
|
||||
t.Run("successful downstream job of a failed job is not included", func(t *testing.T) {
|
||||
job1 := makeJob(1, "job1", actions_model.StatusFailure)
|
||||
job2 := makeJob(2, "job2", actions_model.StatusSuccess, "job1")
|
||||
jobs := []*actions_model.ActionRunJob{job1, job2}
|
||||
|
||||
result := GetFailedRerunJobs(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1, job2}, result)
|
||||
result := GetFailedJobsForRerun(jobs)
|
||||
assert.ElementsMatch(t, []*actions_model.ActionRunJob{job1}, result)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -129,7 +86,7 @@ func TestRerunValidation(t *testing.T) {
|
||||
jobs := []*actions_model.ActionRunJob{
|
||||
{ID: 1, JobID: "job1"},
|
||||
}
|
||||
err := RerunWorkflowRunJobs(context.Background(), nil, runningRun, jobs)
|
||||
_, err := RerunWorkflowRunJobs(t.Context(), nil, runningRun, &user_model.User{ID: 1}, jobs)
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, util.ErrInvalidArgument)
|
||||
})
|
||||
@@ -138,7 +95,7 @@ func TestRerunValidation(t *testing.T) {
|
||||
jobs := []*actions_model.ActionRunJob{
|
||||
{ID: 1, JobID: "job1", Status: actions_model.StatusFailure},
|
||||
}
|
||||
err := RerunWorkflowRunJobs(context.Background(), nil, runningRun, GetFailedRerunJobs(jobs))
|
||||
_, err := RerunWorkflowRunJobs(t.Context(), nil, runningRun, &user_model.User{ID: 1}, GetFailedJobsForRerun(jobs))
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, util.ErrInvalidArgument)
|
||||
})
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/modules/actions/jobparser"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
|
||||
act_model "github.com/nektos/act/pkg/model"
|
||||
"go.yaml.in/yaml/v4"
|
||||
@@ -47,10 +46,7 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model
|
||||
|
||||
CreateCommitStatusForRunJobs(ctx, run, allJobs...)
|
||||
|
||||
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
|
||||
for _, job := range allJobs {
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
|
||||
}
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, allJobs)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -58,7 +54,8 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model
|
||||
// InsertRun inserts a run
|
||||
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
||||
func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte, vars map[string]string, inputs map[string]any, wfRawConcurrency *act_model.RawConcurrency) error {
|
||||
return db.WithTx(ctx, func(ctx context.Context) error {
|
||||
var cancelledConcurrencyJobs []*actions_model.ActionRunJob
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -67,6 +64,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
run.Title = util.EllipsisDisplayString(run.Title, 255)
|
||||
run.Status = actions_model.StatusWaiting
|
||||
|
||||
if wfRawConcurrency != nil {
|
||||
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal raw concurrency: %w", err)
|
||||
}
|
||||
run.RawConcurrency = string(rawConcurrency)
|
||||
}
|
||||
|
||||
// Insert before parsing jobs or evaluating workflow-level concurrency
|
||||
// so that run.ID is populated. Expressions referencing github.run_id —
|
||||
// in run-name, job names, runs-on, or a workflow-level concurrency
|
||||
@@ -76,31 +81,54 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
return err
|
||||
}
|
||||
|
||||
giteaCtx := GenerateGiteaContext(run, nil)
|
||||
runAttempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: run.RepoID,
|
||||
RunID: run.ID,
|
||||
Attempt: 1,
|
||||
TriggerUserID: run.TriggerUserID,
|
||||
Status: actions_model.StatusWaiting,
|
||||
}
|
||||
|
||||
if wfRawConcurrency != nil {
|
||||
if err := EvaluateRunConcurrencyFillModel(ctx, run, runAttempt, wfRawConcurrency, vars, inputs); err != nil {
|
||||
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
|
||||
}
|
||||
// check run (workflow-level) concurrency
|
||||
var jobsToCancel []*actions_model.ActionRunJob
|
||||
runAttempt.Status, jobsToCancel, err = PrepareToStartRunWithConcurrency(ctx, runAttempt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
}
|
||||
|
||||
if err := db.Insert(ctx, runAttempt); err != nil {
|
||||
return err
|
||||
}
|
||||
run.LatestAttemptID = runAttempt.ID
|
||||
|
||||
giteaCtx := GenerateGiteaContext(ctx, run, runAttempt, nil)
|
||||
jobs, err := jobparser.Parse(content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithInputs(inputs))
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse workflow: %w", err)
|
||||
}
|
||||
|
||||
titleChanged := len(jobs) > 0 && jobs[0].RunName != ""
|
||||
if titleChanged {
|
||||
run.Title = util.EllipsisDisplayString(jobs[0].RunName, 255)
|
||||
}
|
||||
|
||||
if wfRawConcurrency != nil {
|
||||
if err := EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars, inputs); err != nil {
|
||||
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
|
||||
}
|
||||
run.Status, err = PrepareToStartRunWithConcurrency(ctx, run)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cols := []string{"latest_attempt_id"}
|
||||
if titleChanged {
|
||||
cols = append(cols, "title")
|
||||
}
|
||||
if err := actions_model.UpdateRun(ctx, run, cols...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
|
||||
var hasWaitingJobs bool
|
||||
|
||||
for _, v := range jobs {
|
||||
for i, v := range jobs {
|
||||
id, job := v.Job()
|
||||
needs := job.Needs()
|
||||
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
|
||||
@@ -108,18 +136,21 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
}
|
||||
payload, _ := v.Marshal()
|
||||
|
||||
shouldBlockJob := len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked
|
||||
shouldBlockJob := runAttempt.Status == actions_model.StatusBlocked || len(needs) > 0 || run.NeedApproval
|
||||
|
||||
job.Name = util.EllipsisDisplayString(job.Name, 255)
|
||||
runJob := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RunAttemptID: runAttempt.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
IsForkPullRequest: run.IsForkPullRequest,
|
||||
Name: job.Name,
|
||||
Attempt: runAttempt.Attempt,
|
||||
WorkflowPayload: payload,
|
||||
JobID: id,
|
||||
AttemptJobID: int64(i + 1),
|
||||
Needs: needs,
|
||||
RunsOn: job.RunsOn(),
|
||||
Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting),
|
||||
@@ -139,7 +170,7 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
|
||||
// do not evaluate job concurrency when it requires `needs`, the jobs with `needs` will be evaluated later by job emitter
|
||||
if len(needs) == 0 {
|
||||
err = EvaluateJobConcurrencyFillModel(ctx, run, runJob, vars, inputs)
|
||||
err = EvaluateJobConcurrencyFillModel(ctx, run, runAttempt, runJob, vars, inputs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||
}
|
||||
@@ -148,10 +179,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
// If a job needs other jobs ("needs" is not empty), its status is set to StatusBlocked at the entry of the loop
|
||||
// No need to check job concurrency for a blocked job (it will be checked by job emitter later)
|
||||
if runJob.Status == actions_model.StatusWaiting {
|
||||
runJob.Status, err = PrepareToStartJobWithConcurrency(ctx, runJob)
|
||||
var jobsToCancel []*actions_model.ActionRunJob
|
||||
runJob.Status, jobsToCancel, err = PrepareToStartJobWithConcurrency(ctx, runJob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("prepare to start job with concurrency: %w", err)
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,15 +196,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
runJobs = append(runJobs, runJob)
|
||||
}
|
||||
|
||||
run.Status = actions_model.AggregateJobStatus(runJobs)
|
||||
cols := []string{"status"}
|
||||
if titleChanged {
|
||||
cols = append(cols, "title")
|
||||
}
|
||||
if wfRawConcurrency != nil {
|
||||
cols = append(cols, "raw_concurrency", "concurrency_group", "concurrency_cancel")
|
||||
}
|
||||
if err := actions_model.UpdateRun(ctx, run, cols...); err != nil {
|
||||
runAttempt.Status = actions_model.AggregateJobStatus(runJobs)
|
||||
if err := actions_model.UpdateRunAttempt(ctx, runAttempt, "status"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -183,5 +209,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledConcurrencyJobs)
|
||||
EmitJobsIfReadyByJobs(cancelledConcurrencyJobs)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
secret_model "code.gitea.io/gitea/models/secret"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
@@ -78,7 +77,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
||||
return fmt.Errorf("findTaskNeeds: %w", err)
|
||||
}
|
||||
|
||||
taskContext, err := generateTaskContext(t)
|
||||
taskContext, err := generateTaskContext(ctx, t)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generateTaskContext: %w", err)
|
||||
}
|
||||
@@ -102,23 +101,23 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
||||
}
|
||||
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, actionTask)
|
||||
NotifyWorkflowJobStatusUpdateWithTask(ctx, job, actionTask)
|
||||
// job.Run is loaded inside the transaction before UpdateRunJob sets run.Started,
|
||||
// so Started is zero only on the very first pick-up of that run.
|
||||
if job.Run.Started.IsZero() {
|
||||
NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
|
||||
NotifyWorkflowRunStatusUpdateWithReload(ctx, job.RepoID, job.RunID)
|
||||
}
|
||||
|
||||
return task, true, nil
|
||||
}
|
||||
|
||||
func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) {
|
||||
func generateTaskContext(ctx context.Context, t *actions_model.ActionTask) (*structpb.Struct, error) {
|
||||
giteaRuntimeToken, err := CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gitCtx := GenerateGiteaContext(t.Job.Run, t.Job)
|
||||
gitCtx := GenerateGiteaContext(ctx, t.Job.Run, nil, t.Job)
|
||||
gitCtx["token"] = t.Token
|
||||
gitCtx["gitea_runtime_token"] = giteaRuntimeToken
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ func TestToActionWorkflowRun_UsesTriggerEvent(t *testing.T) {
|
||||
run.Event = "push"
|
||||
run.TriggerEvent = "schedule"
|
||||
|
||||
apiRun, err := ToActionWorkflowRun(t.Context(), repo, run)
|
||||
apiRun, err := ToActionWorkflowRun(t.Context(), repo, run, nil)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "schedule", apiRun.Event)
|
||||
}
|
||||
|
||||
@@ -247,30 +247,64 @@ func ToActionTask(ctx context.Context, t *actions_model.ActionTask) (*api.Action
|
||||
}, nil
|
||||
}
|
||||
|
||||
func ToActionWorkflowRun(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun) (*api.ActionWorkflowRun, error) {
|
||||
err := run.LoadAttributes(ctx)
|
||||
if err != nil {
|
||||
func ToActionWorkflowRun(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt) (*api.ActionWorkflowRun, error) {
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if attempt == nil {
|
||||
if latestAttempt, has, err := run.GetLatestAttempt(ctx); err != nil {
|
||||
return nil, err
|
||||
} else if has {
|
||||
attempt = latestAttempt
|
||||
}
|
||||
}
|
||||
|
||||
runAttempt := int64(0)
|
||||
status, conclusion := ToActionsStatus(run.Status)
|
||||
startedAt := run.Started.AsLocalTime()
|
||||
completedAt := run.Stopped.AsLocalTime()
|
||||
actor := run.TriggerUser // The username of the user that triggered the initial workflow run.
|
||||
triggerUser := run.TriggerUser // The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from actor.
|
||||
|
||||
// previousAttemptURL is the value of ActionWorkflowRun.PreviousAttemptURL, which is declared as *string without `omitempty` on purpose:
|
||||
// a nil value must still appear in the JSON body as `"previous_attempt_url": null`, matching GitHub's Actions API.
|
||||
var previousAttemptURL *string
|
||||
|
||||
if attempt != nil {
|
||||
if err := attempt.LoadAttributes(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runAttempt = attempt.Attempt
|
||||
status, conclusion = ToActionsStatus(attempt.Status)
|
||||
startedAt = attempt.Started.AsLocalTime()
|
||||
completedAt = attempt.Stopped.AsLocalTime()
|
||||
triggerUser = attempt.TriggerUser
|
||||
if attempt.Attempt > 1 {
|
||||
url := fmt.Sprintf("%s/actions/runs/%d/attempts/%d", repo.APIURL(), run.ID, attempt.Attempt-1)
|
||||
previousAttemptURL = &url
|
||||
}
|
||||
}
|
||||
|
||||
return &api.ActionWorkflowRun{
|
||||
ID: run.ID,
|
||||
URL: fmt.Sprintf("%s/actions/runs/%d", repo.APIURL(), run.ID),
|
||||
HTMLURL: run.HTMLURL(),
|
||||
RunNumber: run.Index,
|
||||
StartedAt: run.Started.AsLocalTime(),
|
||||
CompletedAt: run.Stopped.AsLocalTime(),
|
||||
Event: run.TriggerEvent,
|
||||
DisplayTitle: run.Title,
|
||||
HeadBranch: git.RefName(run.Ref).BranchName(),
|
||||
HeadSha: run.CommitSHA,
|
||||
Status: status,
|
||||
Conclusion: conclusion,
|
||||
Path: fmt.Sprintf("%s@%s", run.WorkflowID, run.Ref),
|
||||
Repository: ToRepo(ctx, repo, access_model.Permission{AccessMode: perm.AccessModeNone}),
|
||||
TriggerActor: ToUser(ctx, run.TriggerUser, nil),
|
||||
// We do not have a way to get a different User for the actor than the trigger user
|
||||
Actor: ToUser(ctx, run.TriggerUser, nil),
|
||||
ID: run.ID,
|
||||
URL: fmt.Sprintf("%s/actions/runs/%d", repo.APIURL(), run.ID),
|
||||
PreviousAttemptURL: previousAttemptURL,
|
||||
HTMLURL: run.HTMLURL(),
|
||||
RunNumber: run.Index,
|
||||
RunAttempt: runAttempt,
|
||||
StartedAt: startedAt,
|
||||
CompletedAt: completedAt,
|
||||
Event: run.TriggerEvent,
|
||||
DisplayTitle: run.Title,
|
||||
HeadBranch: git.RefName(run.Ref).BranchName(),
|
||||
HeadSha: run.CommitSHA,
|
||||
Status: status,
|
||||
Conclusion: conclusion,
|
||||
Path: fmt.Sprintf("%s@%s", run.WorkflowID, run.Ref),
|
||||
Repository: ToRepo(ctx, repo, access_model.Permission{AccessMode: perm.AccessModeNone}),
|
||||
TriggerActor: ToUser(ctx, triggerUser, nil),
|
||||
Actor: ToUser(ctx, actor, nil),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -329,9 +363,9 @@ func ToActionWorkflowJob(ctx context.Context, repo *repo_model.Repository, task
|
||||
var runnerName string
|
||||
var steps []*api.ActionWorkflowStep
|
||||
|
||||
if job.TaskID != 0 {
|
||||
if effectiveTaskID := job.EffectiveTaskID(); effectiveTaskID != 0 {
|
||||
if task == nil {
|
||||
task, _, err = db.GetByID[actions_model.ActionTask](ctx, job.TaskID)
|
||||
task, _, err = db.GetByID[actions_model.ActionTask](ctx, effectiveTaskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ func generateMessageIDForActionsWorkflowRunStatusEmail(repo *repo_model.Reposito
|
||||
}
|
||||
|
||||
func composeAndSendActionsWorkflowRunStatusEmail(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, sender *user_model.User, recipients []*user_model.User) error {
|
||||
jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID)
|
||||
jobs, err := actions_model.GetLatestAttemptJobsByRepoAndRunID(ctx, repo.ID, run.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -399,12 +399,18 @@ func CreateCommitStatus(ctx context.Context, repo *repo_model.Repository, commit
|
||||
}
|
||||
}
|
||||
|
||||
// WorkflowRunStatusUpdate dispatches a workflow run status change to every registered notifier.
|
||||
// Prefer the helpers in services/actions/notify.go over calling this directly;
|
||||
// unless you are sure the caller has already resolved the correct sender and paired notifications.
|
||||
func WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) {
|
||||
for _, notifier := range notifiers {
|
||||
notifier.WorkflowRunStatusUpdate(ctx, repo, sender, run)
|
||||
}
|
||||
}
|
||||
|
||||
// WorkflowJobStatusUpdate dispatches a workflow job status change to every registered notifier.
|
||||
// Prefer the helpers in services/actions/notify.go over calling this directly;
|
||||
// unless you are sure the caller has already resolved the correct sender and paired notifications.
|
||||
func WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
|
||||
for _, notifier := range notifiers {
|
||||
notifier.WorkflowJobStatusUpdate(ctx, repo, sender, job, task)
|
||||
|
||||
@@ -1043,7 +1043,7 @@ func (*webhookNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_
|
||||
return
|
||||
}
|
||||
|
||||
convertedRun, err := convert.ToActionWorkflowRun(ctx, repo, run)
|
||||
convertedRun, err := convert.ToActionWorkflowRun(ctx, repo, run, nil)
|
||||
if err != nil {
|
||||
log.Error("ToActionWorkflowRun: %v", err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user