Skip to content

Commit

Permalink
feat(cancel): add cancel-in-progress feature (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
chmouel authored Nov 18, 2024
1 parent aaf045e commit aff3b66
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 36 deletions.
26 changes: 26 additions & 0 deletions docs/content/docs/guide/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ or on OpenShift using the OpenShift Console.
Pipelines-as-Code will post a URL in the Checks tab for GitHub apps to let you
click on it and follow the pipeline execution directly there.

## Cancelling

### Cancelling in-progress PipelineRuns

{{< tech_preview "Cancelling in progress PipelineRuns" >}}

You can choose to cancel a PipelineRun that is currently in progress. This can
be done by adding the annotation `pipelinesascode.tekton.dev/cancel-in-progress:
"true"` in the PipelineRun definition.
This feature only works if the PipelineRun is in progress. If the PipelineRun
has already completed or has been cancelled, it will be skipped. (For persistent
settings, refer to the [max-keep-run annotation]({{< relref
"/docs/guide/cleanups.md" >}}) instead.)
The cancellation occurs after the latest PipelineRun has been successfully
created and started. This annotation cannot be used to ensure that only one
PipelineRun is active at any time.
Currently, `cancel-in-progress` cannot be used in conjunction with [concurrency
limit]({{< relref "/docs/guide/repositorycrd.md#concurrency" >}}).

### Cancelling a PipelineRun with a GitOps command

See [here]({{< relref "/docs/guide/gitops_commands.md#cancelling-a-pipelinerun" >}})

## Restarting the PipelineRun

You can restart a PipelineRun without having to send a new commit to
Expand Down
17 changes: 17 additions & 0 deletions pkg/action/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ import (
"k8s.io/client-go/util/retry"
)

// PatchPipelineRun patches a Tekton PipelineRun resource with the provided merge patch.
// It retries the patch operation on conflict, doubling the default retry parameters.
//
// Parameters:
// - ctx: The context for the patch operation.
// - logger: A SugaredLogger instance for logging information.
// - whatPatching: A string describing what is being patched, used for logging purposes.
// - tekton: A Tekton client interface for interacting with Tekton resources.
// - pr: The PipelineRun resource to be patched. If nil, the function returns nil.
// - mergePatch: A map representing the JSON merge patch to apply to the PipelineRun.
//
// Returns:
// - *tektonv1.PipelineRun: The patched PipelineRun resource, or the original PipelineRun if an error occurs.
// - error: An error if the patch operation fails after retries, or nil if successful.
//
// The function doubles the default retry parameters (steps, duration, factor, jitter) to handle conflicts more robustly.
// If the patch operation fails after retries, the original PipelineRun is returned along with the error.
func PatchPipelineRun(ctx context.Context, logger *zap.SugaredLogger, whatPatching string, tekton versioned.Interface, pr *tektonv1.PipelineRun, mergePatch map[string]interface{}) (*tektonv1.PipelineRun, error) {
if pr == nil {
return nil, nil
Expand Down
67 changes: 34 additions & 33 deletions pkg/apis/pipelinesascode/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,40 @@ import (
)

const (
ControllerInfo = pipelinesascode.GroupName + "/controller-info"
Task = pipelinesascode.GroupName + "/task"
Pipeline = pipelinesascode.GroupName + "/pipeline"
URLOrg = pipelinesascode.GroupName + "/url-org"
URLRepository = pipelinesascode.GroupName + "/url-repository"
SHA = pipelinesascode.GroupName + "/sha"
Sender = pipelinesascode.GroupName + "/sender"
EventType = pipelinesascode.GroupName + "/event-type"
Branch = pipelinesascode.GroupName + "/branch"
SourceBranch = pipelinesascode.GroupName + "/source-branch"
Repository = pipelinesascode.GroupName + "/repository"
GitProvider = pipelinesascode.GroupName + "/git-provider"
State = pipelinesascode.GroupName + "/state"
ShaTitle = pipelinesascode.GroupName + "/sha-title"
ShaURL = pipelinesascode.GroupName + "/sha-url"
RepoURL = pipelinesascode.GroupName + "/repo-url"
SourceRepoURL = pipelinesascode.GroupName + "/source-repo-url"
PullRequest = pipelinesascode.GroupName + "/pull-request"
InstallationID = pipelinesascode.GroupName + "/installation-id"
GHEURL = pipelinesascode.GroupName + "/ghe-url"
SourceProjectID = pipelinesascode.GroupName + "/source-project-id"
TargetProjectID = pipelinesascode.GroupName + "/target-project-id"
OriginalPRName = pipelinesascode.GroupName + "/original-prname"
GitAuthSecret = pipelinesascode.GroupName + "/git-auth-secret"
CheckRunID = pipelinesascode.GroupName + "/check-run-id"
OnEvent = pipelinesascode.GroupName + "/on-event"
OnComment = pipelinesascode.GroupName + "/on-comment"
OnTargetBranch = pipelinesascode.GroupName + "/on-target-branch"
OnCelExpression = pipelinesascode.GroupName + "/on-cel-expression"
TargetNamespace = pipelinesascode.GroupName + "/target-namespace"
MaxKeepRuns = pipelinesascode.GroupName + "/max-keep-runs"
LogURL = pipelinesascode.GroupName + "/log-url"
ExecutionOrder = pipelinesascode.GroupName + "/execution-order"
ControllerInfo = pipelinesascode.GroupName + "/controller-info"
Task = pipelinesascode.GroupName + "/task"
Pipeline = pipelinesascode.GroupName + "/pipeline"
URLOrg = pipelinesascode.GroupName + "/url-org"
URLRepository = pipelinesascode.GroupName + "/url-repository"
SHA = pipelinesascode.GroupName + "/sha"
Sender = pipelinesascode.GroupName + "/sender"
EventType = pipelinesascode.GroupName + "/event-type"
Branch = pipelinesascode.GroupName + "/branch"
SourceBranch = pipelinesascode.GroupName + "/source-branch"
Repository = pipelinesascode.GroupName + "/repository"
GitProvider = pipelinesascode.GroupName + "/git-provider"
State = pipelinesascode.GroupName + "/state"
ShaTitle = pipelinesascode.GroupName + "/sha-title"
ShaURL = pipelinesascode.GroupName + "/sha-url"
RepoURL = pipelinesascode.GroupName + "/repo-url"
SourceRepoURL = pipelinesascode.GroupName + "/source-repo-url"
PullRequest = pipelinesascode.GroupName + "/pull-request"
InstallationID = pipelinesascode.GroupName + "/installation-id"
GHEURL = pipelinesascode.GroupName + "/ghe-url"
SourceProjectID = pipelinesascode.GroupName + "/source-project-id"
TargetProjectID = pipelinesascode.GroupName + "/target-project-id"
OriginalPRName = pipelinesascode.GroupName + "/original-prname"
GitAuthSecret = pipelinesascode.GroupName + "/git-auth-secret"
CheckRunID = pipelinesascode.GroupName + "/check-run-id"
OnEvent = pipelinesascode.GroupName + "/on-event"
OnComment = pipelinesascode.GroupName + "/on-comment"
OnTargetBranch = pipelinesascode.GroupName + "/on-target-branch"
OnCelExpression = pipelinesascode.GroupName + "/on-cel-expression"
TargetNamespace = pipelinesascode.GroupName + "/target-namespace"
MaxKeepRuns = pipelinesascode.GroupName + "/max-keep-runs"
CancelInProgress = pipelinesascode.GroupName + "/cancel-in-progress"
LogURL = pipelinesascode.GroupName + "/log-url"
ExecutionOrder = pipelinesascode.GroupName + "/execution-order"
// PublicGithubAPIURL default is "https://api.github.com" but it can be overridden by X-GitHub-Enterprise-Host header.
PublicGithubAPIURL = "https://api.github.com"
GithubApplicationID = "github-application-id"
Expand Down
60 changes: 60 additions & 0 deletions pkg/pipelineascode/cancel_pipelineruns.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,66 @@ var cancelMergePatch = map[string]interface{}{
},
}

// cancelInProgress cancels all PipelineRuns associated with a given repository and pull request,
// except for the one that triggered the cancellation. It first checks if the cancellation is in progress
// and if the repository has a concurrency limit. If a concurrency limit is set, it returns an error as
// cancellation is not supported with concurrency limits. It then retrieves the original pull request name
// from the annotations and lists all PipelineRuns with matching labels. For each PipelineRun that is not
// already done, cancelled, or gracefully stopped, it patches the PipelineRun to cancel it.
func (p *PacRun) cancelInProgress(ctx context.Context, matchPR *tektonv1.PipelineRun, repo *v1alpha1.Repository) error {
if matchPR == nil {
return nil
}
if key, ok := matchPR.GetAnnotations()[keys.CancelInProgress]; !ok || key != "true" {
return nil
}

if repo.Spec.ConcurrencyLimit != nil && *repo.Spec.ConcurrencyLimit > 0 {
return fmt.Errorf("cancel in progress is not supported with concurrency limit")
}

prName, ok := matchPR.GetAnnotations()[keys.OriginalPRName]
if !ok {
return nil
}
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
keys.OriginalPRName: prName,
})

prs, err := p.run.Clients.Tekton.TektonV1().PipelineRuns(matchPR.GetNamespace()).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("failed to list pipelineRuns : %w", err)
}
var wg sync.WaitGroup
for _, pr := range prs.Items {
if pr.GetName() == matchPR.GetName() {
continue
}
if pr.IsDone() {
continue
}
if pr.IsCancelled() || pr.IsGracefullyCancelled() || pr.IsGracefullyStopped() {
continue
}

p.logger.Infof("cancel-in-progress: cancelling pipelinerun %v/%v", pr.GetNamespace(), pr.GetName())
wg.Add(1)
go func(ctx context.Context, pr tektonv1.PipelineRun) {
defer wg.Done()
if _, err := action.PatchPipelineRun(ctx, p.logger, "cancel patch", p.run.Clients.Tekton, &pr, cancelMergePatch); err != nil {
errMsg := fmt.Sprintf("failed to cancel pipelineRun %s/%s: %s", pr.GetNamespace(), pr.GetName(), err.Error())
p.eventEmitter.EmitMessage(repo, zap.ErrorLevel, "RepositoryPipelineRun", errMsg)
}
}(ctx, pr)
}
wg.Wait()

return nil
}

func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Repository) error {
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package pipelineascode

import (
"fmt"
"strconv"
"testing"

"github.com/google/go-github/v64/github"
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys"
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
"github.com/openshift-pipelines/pipelines-as-code/pkg/formatting"
Expand Down Expand Up @@ -37,26 +39,30 @@ var (
keys.SHA: formatting.CleanValueKubernetes("foosha"),
}
fooRepoLabels = map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes("foo"),
keys.SHA: formatting.CleanValueKubernetes("foosha"),
keys.PullRequest: strconv.Itoa(11),
keys.OriginalPRName: "pr-foo",
keys.URLRepository: formatting.CleanValueKubernetes("foo"),
keys.SHA: formatting.CleanValueKubernetes("foosha"),
keys.PullRequest: strconv.Itoa(11),
}
fooRepoAnnotations = map[string]string{
keys.URLRepository: "foo",
keys.SHA: "foosha",
keys.PullRequest: strconv.Itoa(11),
keys.Repository: "foo",
}
fooRepoLabelsPrFooAbc = map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes("foo"),
keys.SHA: formatting.CleanValueKubernetes("foosha"),
keys.PullRequest: strconv.Itoa(11),
keys.OriginalPRName: "pr-foo-abc",
keys.Repository: "foo",
}
fooRepoAnnotationsPrFooAbc = map[string]string{
keys.URLRepository: "foo",
keys.SHA: "foosha",
keys.PullRequest: strconv.Itoa(11),
keys.OriginalPRName: "pr-foo-abc",
keys.Repository: "foo",
}
)

Expand Down Expand Up @@ -306,3 +312,151 @@ func TestCancelPipelinerun(t *testing.T) {
})
}
}

func TestCancelInProgress(t *testing.T) {
observer, catcher := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()
tests := []struct {
name string
event *info.Event
repo *v1alpha1.Repository
pipelineRuns []*pipelinev1.PipelineRun
cancelledPipelineRuns map[string]bool
wantErrString string
wantLog string
}{
{
name: "cancel in progress",
event: &info.Event{
Repository: "foo",
SHA: "foosha",
},
pipelineRuns: []*pipelinev1.PipelineRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo",
Namespace: "foo",
Labels: fooRepoLabels,
Annotations: map[string]string{keys.CancelInProgress: "true", keys.OriginalPRName: "pr-foo", keys.Repository: "foo"},
},
Spec: pipelinev1.PipelineRunSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pr-foo-",
Namespace: "foo",
Labels: fooRepoLabels,
Annotations: map[string]string{keys.CancelInProgress: "true", keys.OriginalPRName: "pr-foo", keys.Repository: "foo"},
},
Spec: pipelinev1.PipelineRunSpec{},
},
},
repo: fooRepo,
cancelledPipelineRuns: map[string]bool{
"pr-foo-1": true,
},
wantLog: "cancel-in-progress: cancelling pipelinerun foo/",
},
{
name: "cancel in progress with concurrency limit",
event: &info.Event{
Repository: "foo",
SHA: "foosha",
},
pipelineRuns: []*pipelinev1.PipelineRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo",
Namespace: "foo",
Labels: fooRepoLabels,
Annotations: map[string]string{keys.CancelInProgress: "true", keys.OriginalPRName: "pr-foo", keys.Repository: "foo"},
},
Spec: pipelinev1.PipelineRunSpec{},
},
},
repo: &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "foo",
},
Spec: v1alpha1.RepositorySpec{
URL: "https://github.com/fooorg/foo",
ConcurrencyLimit: github.Int(1),
},
},
cancelledPipelineRuns: map[string]bool{},
wantErrString: "cancel in progress is not supported with concurrency limit",
wantLog: "cancel-in-progress: cancelling pipelinerun foo/",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, _ := rtesting.SetupFakeContext(t)

tdata := testclient.Data{
PipelineRuns: tt.pipelineRuns,
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)
cs := &params.Run{
Clients: clients.Clients{
Log: logger,
Tekton: stdata.Pipeline,
Kube: stdata.Kube,
},
}
pac := NewPacs(tt.event, nil, cs, &info.PacOpts{}, nil, logger, nil)
err := pac.cancelInProgress(ctx, tt.pipelineRuns[0], tt.repo)
if tt.wantErrString != "" {
assert.ErrorContains(t, err, tt.wantErrString)
return
}
assert.NilError(t, err)

// the fake k8 test library don't set cancellation status, so we can't check the status :\
_, err = cs.Clients.Tekton.TektonV1().PipelineRuns("foo").List(ctx, metav1.ListOptions{})
assert.NilError(t, err)

if tt.wantLog != "" {
assert.Assert(t, len(catcher.FilterMessageSnippet(tt.wantLog).TakeAll()) > 0, fmt.Sprintf("could not find log message: got %+v", catcher.TakeAll()))
}
})
}
}

func TestGetLabelSelector(t *testing.T) {
tests := []struct {
name string
labelsMap map[string]string
want string
}{
{
name: "single label",
labelsMap: map[string]string{
"app": "nginx",
},
want: "app=nginx",
},
{
name: "multiple labels",
labelsMap: map[string]string{
"app": "nginx",
"version": "1.14.2",
},
want: "app=nginx,version=1.14.2",
},
{
name: "empty labels",
labelsMap: map[string]string{},
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getLabelSelector(tt.labelsMap); got != tt.want {
t.Errorf("getLabelSelector() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit aff3b66

Please sign in to comment.