From 5280d741abd311aab3e6ac1af51821bf48494363 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 17 Oct 2023 16:45:50 +0200 Subject: [PATCH] changes Signed-off-by: Matthias Wessendorf --- pkg/knative/deployer.go | 96 +++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 36 deletions(-) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 0fd59fa61a..c8806d5ad4 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -9,7 +9,9 @@ import ( "strings" "time" + clienteventingv1 "knative.dev/client-pkg/pkg/eventing/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -24,8 +26,6 @@ import ( "knative.dev/serving/pkg/apis/autoscaling" v1 "knative.dev/serving/pkg/apis/serving/v1" - duckv1 "knative.dev/pkg/apis/duck/v1" - fn "knative.dev/func/pkg/functions" "knative.dev/func/pkg/k8s" ) @@ -197,40 +197,6 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu close(cherr) }() - for i, sub := range f.Subscription { - - // create the filter: - attributes := make(map[string]string) - for key, value := range sub.Filters { - attributes[key] = value - } - - err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-function-trigger-%d", f.Name, i), - }, - Spec: eventingv1.TriggerSpec{ - Broker: sub.Source, - - Subscriber: duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: "serving.knative.dev/v1", - Kind: "Service", - Name: f.Name, - }}, - - Filter: &eventingv1.TriggerFilter{ - Attributes: attributes, - }, - }, - }) - if err != nil { - err = fmt.Errorf("knative deployer failed to deploy the Knative Service: %v", err) - return fn.DeploymentResult{}, err - } - - } - presumePrivate := false main: // Wait for either a timeout or a container condition signaling the image is unreachable @@ -265,6 +231,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, err, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + if d.verbose { fmt.Printf("Function deployed in namespace %q and exposed at URL:\n%s\n", d.Namespace, route.Status.URL.String()) } @@ -324,6 +295,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, err, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + return fn.DeploymentResult{ Status: fn.Updated, URL: route.Status.URL.String(), @@ -332,6 +308,54 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } } +func createTriggers(ctx context.Context, f fn.Function, err error, client clientservingv1.KnServingClient, eventingClient clienteventingv1.KnEventingClient) error { + ksvc, err := client.GetService(ctx, f.Name) + if err != nil { + err = fmt.Errorf("knative deployer failed to get the Service for Trigger: %v", err) + return err + } + for i, sub := range f.Subscription { + // create the filter: + attributes := make(map[string]string) + for key, value := range sub.Filters { + attributes[key] = value + } + + err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-function-trigger-%d", ksvc.Name, i), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.GetName(), + UID: ksvc.GetUID(), + }, + }, + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.Name, + }}, + + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + }) + if err != nil && !errors.IsAlreadyExists(err) { + err = fmt.Errorf("knative deployer failed to create the Trigger: %v", err) + return err + } + } + return nil +} + func probeFor(url string) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{