From a7661bcfaa76a4eaf58acdb737c365d61d97e533 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 28 Sep 2023 10:29:09 +0200 Subject: [PATCH 01/21] Adding support for `func subscribe` for creating mutiple triggers, based on event filters Signed-off-by: Matthias Wessendorf --- cmd/root.go | 1 + cmd/subscribe.go | 81 +++++++++++++++++++++++++++++++++++++++ pkg/functions/function.go | 8 ++++ pkg/knative/deployer.go | 41 ++++++++++++++++++++ 4 files changed, 131 insertions(+) create mode 100644 cmd/subscribe.go diff --git a/cmd/root.go b/cmd/root.go index 234a198681..dcfc3ded45 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -76,6 +76,7 @@ Learn more about Knative at: https://knative.dev`, cfg.Name), NewDeployCmd(newClient), NewDeleteCmd(newClient), NewListCmd(newClient), + NewSubscribeCmd(), }, }, { diff --git a/cmd/subscribe.go b/cmd/subscribe.go new file mode 100644 index 0000000000..aabb2f1b6f --- /dev/null +++ b/cmd/subscribe.go @@ -0,0 +1,81 @@ +package cmd + +import ( + "github.com/ory/viper" + "github.com/spf13/cobra" + "knative.dev/func/pkg/config" + fn "knative.dev/func/pkg/functions" +) + +func NewSubscribeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "subscribe", + Short: "Subscribe to a function", + Long: `Subscribe to a function`, + SuggestFor: []string{"subscribe", "subscribe"}, + PreRunE: bindEnv("filter", "source"), + Run: func(cmd *cobra.Command, args []string) { + runSubscribe(cmd, args) + }, + } + + cmd.Flags().StringP("filter", "f", "", "The event metadata to filter for") + cmd.Flags().StringP("source", "s", "default", "The source, like a Knative Broker") + + return cmd +} + +// / +func runSubscribe(cmd *cobra.Command, args []string) (err error) { + var ( + cfg subscibeConfig + f fn.Function + ) + cfg = newSubscribeConfig() + + if err = config.CreatePaths(); err != nil { // for possible auth.json usage + return + } + if f, err = fn.NewFunction(""); err != nil { + return + } + if !f.Initialized() { + return fn.NewErrNotInitialized(f.Root) + } + if f, err = fn.NewFunction(""); err != nil { + return + } + if !f.Initialized() { + return fn.NewErrNotInitialized(f.Root) + } + + // add it + if f.Subscription == nil { + f.Subscription = []fn.SubscriptionSpec{} + } + f.Subscription = append(f.Subscription, fn.SubscriptionSpec{ + Source: cfg.Source, + Filters: map[string]string{ + "type": cfg.Filter, + }, + }) + + // pump it + f.Write() + + return f.Stamp() +} + +type subscibeConfig struct { + Filter string + Source string +} + +func newSubscribeConfig() subscibeConfig { + c := subscibeConfig{ + Filter: viper.GetString("filter"), + Source: viper.GetString("source"), + } + + return c +} diff --git a/pkg/functions/function.go b/pkg/functions/function.go index 6804f62ca8..8e4d109c46 100644 --- a/pkg/functions/function.go +++ b/pkg/functions/function.go @@ -82,6 +82,8 @@ type Function struct { // Build defines the build properties for a function Build BuildSpec `yaml:"build,omitempty"` + Subscription []SubscriptionSpec `yaml:"subscriptions,omitempty"` + // Run defines the runtime properties for a function Run RunSpec `yaml:"run,omitempty"` @@ -89,6 +91,12 @@ type Function struct { Deploy DeploySpec `yaml:"deploy,omitempty"` } +// SubscriptionSpec +type SubscriptionSpec struct { + Source string `yaml:"source"` + Filters map[string]string `yaml:"filters,omitempty"` +} + // BuildSpec type BuildSpec struct { // Git stores information about an optionally associated git repository. diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index da1e024129..3f6c0be0a8 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "os" "regexp" "strings" @@ -22,6 +23,8 @@ import ( "knative.dev/serving/pkg/apis/autoscaling" v1 "knative.dev/serving/pkg/apis/serving/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" + fn "knative.dev/func/pkg/functions" "knative.dev/func/pkg/k8s" ) @@ -129,6 +132,10 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu if err != nil { return fn.DeploymentResult{}, err } + eventingClient, err := NewEventingClient(d.Namespace) + if err != nil { + return fn.DeploymentResult{}, err + } var outBuff SynchronizedBuffer var out io.Writer = &outBuff @@ -189,6 +196,40 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu close(cherr) }() + for i, sub := range f.Subscription { + + // create the filter: + attributes := make(map[string]string) + for key, value := range sub.Filters { + attributes[key] = value + } + + err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-function-trigger-%d", f.Name, i), + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "serving.knative.dev/v1", + Kind: "Service", + Name: f.Name, + }}, + + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + }) + if err != nil { + err = fmt.Errorf("knative deployer failed to deploy the Knative Service: %v", err) + return fn.DeploymentResult{}, err + } + + } + presumePrivate := false main: // Wait for either a timeout or a container condition signaling the image is unreachable From b059ec3759ac9ee4be910414f72df6eaf1005235 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:54:50 +0200 Subject: [PATCH 02/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index aabb2f1b6f..210633514a 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -14,8 +14,8 @@ func NewSubscribeCmd() *cobra.Command { Long: `Subscribe to a function`, SuggestFor: []string{"subscribe", "subscribe"}, PreRunE: bindEnv("filter", "source"), - Run: func(cmd *cobra.Command, args []string) { - runSubscribe(cmd, args) + RunE: func(cmd *cobra.Command, args []string) error { + return runSubscribe(cmd, args) }, } From 723de4a911efb01cf07e73943c048b4acdfe7018 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:55:00 +0200 Subject: [PATCH 03/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 210633514a..c9fcaf707d 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -33,9 +33,6 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { ) cfg = newSubscribeConfig() - if err = config.CreatePaths(); err != nil { // for possible auth.json usage - return - } if f, err = fn.NewFunction(""); err != nil { return } From 7bab7216e5e1f9c0e72b3f64add2fad831aa3616 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:55:09 +0200 Subject: [PATCH 04/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index c9fcaf707d..a6a6b9c9f2 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -39,9 +39,6 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { if !f.Initialized() { return fn.NewErrNotInitialized(f.Root) } - if f, err = fn.NewFunction(""); err != nil { - return - } if !f.Initialized() { return fn.NewErrNotInitialized(f.Root) } From 4321b8dc0f5296f6cee3ec3e2944e6b0947149f5 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:55:32 +0200 Subject: [PATCH 05/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index a6a6b9c9f2..f45662f771 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -44,9 +44,6 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { } // add it - if f.Subscription == nil { - f.Subscription = []fn.SubscriptionSpec{} - } f.Subscription = append(f.Subscription, fn.SubscriptionSpec{ Source: cfg.Source, Filters: map[string]string{ From fd44fc4eaf9f2536e5a96f2fa915fd286d84a360 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:55:53 +0200 Subject: [PATCH 06/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index f45662f771..5be5bed84d 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -54,7 +54,6 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { // pump it f.Write() - return f.Stamp() } type subscibeConfig struct { From 69eee2afea2fb5384ba4e09afffa9c8adeaa66d6 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 15:56:16 +0200 Subject: [PATCH 07/21] Update cmd/subscribe.go Co-authored-by: Luke Kingland --- cmd/subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 5be5bed84d..99674990fd 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -52,7 +52,7 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { }) // pump it - f.Write() + return f.Write() } From aa65ae5dcd5df41b85b9e66c49d7a5d36c197dd4 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 16:00:43 +0200 Subject: [PATCH 08/21] removing unused import Signed-off-by: Matthias Wessendorf --- cmd/subscribe.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 99674990fd..2a8d57bff4 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -3,7 +3,6 @@ package cmd import ( "github.com/ory/viper" "github.com/spf13/cobra" - "knative.dev/func/pkg/config" fn "knative.dev/func/pkg/functions" ) From e790c86ceec1c7225578c2640c29102f91f2b5cd Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 16:00:50 +0200 Subject: [PATCH 09/21] running make Signed-off-by: Matthias Wessendorf --- docs/reference/func.md | 1 + docs/reference/func_subscribe.md | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 docs/reference/func_subscribe.md diff --git a/docs/reference/func.md b/docs/reference/func.md index b0267a209e..b5ae5f6056 100644 --- a/docs/reference/func.md +++ b/docs/reference/func.md @@ -36,6 +36,7 @@ Learn more about Knative at: https://knative.dev * [func list](func_list.md) - List deployed functions * [func repository](func_repository.md) - Manage installed template repositories * [func run](func_run.md) - Run the function locally +* [func subscribe](func_subscribe.md) - Subscribe to a function * [func templates](func_templates.md) - List available function source templates * [func version](func_version.md) - Function client version information diff --git a/docs/reference/func_subscribe.md b/docs/reference/func_subscribe.md new file mode 100644 index 0000000000..5538ec5d8b --- /dev/null +++ b/docs/reference/func_subscribe.md @@ -0,0 +1,24 @@ +## func subscribe + +Subscribe to a function + +### Synopsis + +Subscribe to a function + +``` +func subscribe +``` + +### Options + +``` + -f, --filter string The event metadata to filter for + -h, --help help for subscribe + -s, --source string The source, like a Knative Broker (default "default") +``` + +### SEE ALSO + +* [func](func.md) - func manages Knative Functions + From 84b8308d838417882399c89a8caccdabf91535d2 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 10 Oct 2023 16:03:01 +0200 Subject: [PATCH 10/21] Some import ogranization Signed-off-by: Matthias Wessendorf --- pkg/knative/deployer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 3f6c0be0a8..8b73dfbb31 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "io" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "os" "regexp" "strings" "time" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" From 9c0ce9c83956cf906c5fadf4fbeec73287ccc8da Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Wed, 11 Oct 2023 14:29:46 +0200 Subject: [PATCH 11/21] Change argument syntax Signed-off-by: Matthias Wessendorf --- cmd/subscribe.go | 49 +++++++++++++++++++++++--------- docs/reference/func_subscribe.md | 6 ++-- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 2a8d57bff4..a8f16ffb2b 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -1,6 +1,9 @@ package cmd import ( + "fmt" + "strings" + "github.com/ory/viper" "github.com/spf13/cobra" fn "knative.dev/func/pkg/functions" @@ -18,19 +21,19 @@ func NewSubscribeCmd() *cobra.Command { }, } - cmd.Flags().StringP("filter", "f", "", "The event metadata to filter for") + cmd.Flags().StringArrayP("filter", "f", []string{}, "Filter for the Cloud Event metadata") + cmd.Flags().StringP("source", "s", "default", "The source, like a Knative Broker") return cmd } -// / func runSubscribe(cmd *cobra.Command, args []string) (err error) { var ( cfg subscibeConfig f fn.Function ) - cfg = newSubscribeConfig() + cfg = newSubscribeConfig(cmd) if f, err = fn.NewFunction(""); err != nil { return @@ -42,12 +45,10 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { return fn.NewErrNotInitialized(f.Root) } - // add it + // add subscription to function f.Subscription = append(f.Subscription, fn.SubscriptionSpec{ - Source: cfg.Source, - Filters: map[string]string{ - "type": cfg.Filter, - }, + Source: cfg.Source, + Filters: extractFilterMap(cfg), }) // pump it @@ -55,16 +56,38 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { } +func extractFilterMap(cfg subscibeConfig) map[string]string { + subscriptionFilters := make(map[string]string) + for _, filter := range cfg.Filter { + kv := strings.Split(filter, "=") + if len(kv) != 2 { + fmt.Println("Invalid pair:", filter) + continue + } + key := kv[0] + value := kv[1] + subscriptionFilters[key] = value + } + return subscriptionFilters +} + type subscibeConfig struct { - Filter string + Filter []string Source string } -func newSubscribeConfig() subscibeConfig { - c := subscibeConfig{ - Filter: viper.GetString("filter"), +func newSubscribeConfig(cmd *cobra.Command) (c subscibeConfig) { + c = subscibeConfig{ + Filter: viper.GetStringSlice("filter"), Source: viper.GetString("source"), } + // NOTE: .Filter should be viper.GetStringSlice, but this returns unparsed + // results and appears to be an open issue since 2017: + // https://github.com/spf13/viper/issues/380 + var err error + if c.Filter, err = cmd.Flags().GetStringArray("filter"); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "error reading filter arguments: %v", err) + } - return c + return } diff --git a/docs/reference/func_subscribe.md b/docs/reference/func_subscribe.md index 5538ec5d8b..d6aea6e946 100644 --- a/docs/reference/func_subscribe.md +++ b/docs/reference/func_subscribe.md @@ -13,9 +13,9 @@ func subscribe ### Options ``` - -f, --filter string The event metadata to filter for - -h, --help help for subscribe - -s, --source string The source, like a Knative Broker (default "default") + -f, --filter stringArray Filter for the Cloud Event metadata + -h, --help help for subscribe + -s, --source string The source, like a Knative Broker (default "default") ``` ### SEE ALSO From bfcc5179031fa5e403aa4b33f514350ea3e215a6 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 17 Oct 2023 16:45:50 +0200 Subject: [PATCH 12/21] changes Signed-off-by: Matthias Wessendorf --- pkg/knative/deployer.go | 96 +++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 36 deletions(-) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 8b73dfbb31..c48c18f36c 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -9,7 +9,9 @@ import ( "strings" "time" + clienteventingv1 "knative.dev/client-pkg/pkg/eventing/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -24,8 +26,6 @@ import ( "knative.dev/serving/pkg/apis/autoscaling" v1 "knative.dev/serving/pkg/apis/serving/v1" - duckv1 "knative.dev/pkg/apis/duck/v1" - fn "knative.dev/func/pkg/functions" "knative.dev/func/pkg/k8s" ) @@ -197,40 +197,6 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu close(cherr) }() - for i, sub := range f.Subscription { - - // create the filter: - attributes := make(map[string]string) - for key, value := range sub.Filters { - attributes[key] = value - } - - err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-function-trigger-%d", f.Name, i), - }, - Spec: eventingv1.TriggerSpec{ - Broker: sub.Source, - - Subscriber: duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: "serving.knative.dev/v1", - Kind: "Service", - Name: f.Name, - }}, - - Filter: &eventingv1.TriggerFilter{ - Attributes: attributes, - }, - }, - }) - if err != nil { - err = fmt.Errorf("knative deployer failed to deploy the Knative Service: %v", err) - return fn.DeploymentResult{}, err - } - - } - presumePrivate := false main: // Wait for either a timeout or a container condition signaling the image is unreachable @@ -265,6 +231,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, err, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + if d.verbose { fmt.Printf("Function deployed in namespace %q and exposed at URL:\n%s\n", d.Namespace, route.Status.URL.String()) } @@ -324,6 +295,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, err, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + return fn.DeploymentResult{ Status: fn.Updated, URL: route.Status.URL.String(), @@ -332,6 +308,54 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } } +func createTriggers(ctx context.Context, f fn.Function, err error, client clientservingv1.KnServingClient, eventingClient clienteventingv1.KnEventingClient) error { + ksvc, err := client.GetService(ctx, f.Name) + if err != nil { + err = fmt.Errorf("knative deployer failed to get the Service for Trigger: %v", err) + return err + } + for i, sub := range f.Subscription { + // create the filter: + attributes := make(map[string]string) + for key, value := range sub.Filters { + attributes[key] = value + } + + err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-function-trigger-%d", ksvc.Name, i), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.GetName(), + UID: ksvc.GetUID(), + }, + }, + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.Name, + }}, + + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + }) + if err != nil && !errors.IsAlreadyExists(err) { + err = fmt.Errorf("knative deployer failed to create the Trigger: %v", err) + return err + } + } + return nil +} + func probeFor(url string) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ From 055595874c38fff85e43c3660b7b3df289115cc9 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 19 Oct 2023 09:35:25 +0200 Subject: [PATCH 13/21] Adding some emoji text Signed-off-by: Matthias Wessendorf --- pkg/knative/deployer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index c48c18f36c..41ddd64700 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -314,6 +314,9 @@ func createTriggers(ctx context.Context, f fn.Function, err error, client client err = fmt.Errorf("knative deployer failed to get the Service for Trigger: %v", err) return err } + + fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") + for i, sub := range f.Subscription { // create the filter: attributes := make(map[string]string) From 972f87fce1ac14278d6b355961c5495c0b652559 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Wed, 25 Oct 2023 16:00:20 +0200 Subject: [PATCH 14/21] :lipstick: move subscriptions underneath the deploy element Signed-off-by: Matthias Wessendorf --- cmd/subscribe.go | 2 +- pkg/functions/function.go | 8 ++++---- pkg/knative/deployer.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index a8f16ffb2b..c0acede833 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -46,7 +46,7 @@ func runSubscribe(cmd *cobra.Command, args []string) (err error) { } // add subscription to function - f.Subscription = append(f.Subscription, fn.SubscriptionSpec{ + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ Source: cfg.Source, Filters: extractFilterMap(cfg), }) diff --git a/pkg/functions/function.go b/pkg/functions/function.go index 8e4d109c46..17d66ae147 100644 --- a/pkg/functions/function.go +++ b/pkg/functions/function.go @@ -82,8 +82,6 @@ type Function struct { // Build defines the build properties for a function Build BuildSpec `yaml:"build,omitempty"` - Subscription []SubscriptionSpec `yaml:"subscriptions,omitempty"` - // Run defines the runtime properties for a function Run RunSpec `yaml:"run,omitempty"` @@ -91,8 +89,8 @@ type Function struct { Deploy DeploySpec `yaml:"deploy,omitempty"` } -// SubscriptionSpec -type SubscriptionSpec struct { +// KnativeSubscription +type KnativeSubscription struct { Source string `yaml:"source"` Filters map[string]string `yaml:"filters,omitempty"` } @@ -167,6 +165,8 @@ type DeploySpec struct { // succeed. // More info: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/ ServiceAccountName string `yaml:"serviceAccountName,omitempty"` + + Subscriptions []KnativeSubscription `yaml:"subscriptions,omitempty"` } // HealthEndpoints specify the liveness and readiness endpoints for a Runtime diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 41ddd64700..3959ccc470 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -317,7 +317,7 @@ func createTriggers(ctx context.Context, f fn.Function, err error, client client fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") - for i, sub := range f.Subscription { + for i, sub := range f.Deploy.Subscriptions { // create the filter: attributes := make(map[string]string) for key, value := range sub.Filters { From 057755fb4304933126705c6f34d79777e3db502d Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Wed, 25 Oct 2023 16:03:39 +0200 Subject: [PATCH 15/21] adding silly emoji to build Signed-off-by: Matthias Wessendorf --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 4a8a1ec063..43d9583e66 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,7 @@ MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) # Default Targets all: build docs + @echo '🎉 Build process completed!' # Help Text # Headings: lines with `##$` comment prefix From 5a23f47288b2a504d65d9637d638d20823e11c01 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 26 Oct 2023 16:38:51 +0200 Subject: [PATCH 16/21] Adding some simple/copied/modified test Signed-off-by: Matthias Wessendorf --- pkg/functions/client_int_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/functions/client_int_test.go b/pkg/functions/client_int_test.go index d8c9bb2e67..6c7eecb7eb 100644 --- a/pkg/functions/client_int_test.go +++ b/pkg/functions/client_int_test.go @@ -163,6 +163,31 @@ func TestDeployWithOptions(t *testing.T) { defer del(t, client, "test-deploy-with-options") } +func TestDeployWithTriggers(t *testing.T) { + root, cleanup := Mktemp(t) + defer cleanup() + verbose := true + + f := fn.Function{Runtime: "go", Name: "test-deploy-with-triggers", Root: root} + f.Deploy = fn.DeploySpec{ + Subscriptions: []fn.KnativeSubscription{ + { + Source: "default", + Filters: map[string]string{ + "key": "value", + "foo": "bar", + }, + }, + }, + } + + client := newClient(verbose) + if _, _, err := client.New(context.Background(), f); err != nil { + t.Fatal(err) + } + defer del(t, client, "test-deploy-with-triggers") +} + func TestUpdateWithAnnotationsAndLabels(t *testing.T) { functionName := "updateannlab" defer Within(t, "testdata/example.com/"+functionName)() From e5f1c5d4171097c11516f3038a2a23130fa691d5 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 2 Nov 2023 09:32:35 +0100 Subject: [PATCH 17/21] Running 'make schema-generate' Signed-off-by: Matthias Wessendorf --- schema/func_yaml-schema.json | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/schema/func_yaml-schema.json b/schema/func_yaml-schema.json index ab5c425cdb..656677bfc5 100644 --- a/schema/func_yaml-schema.json +++ b/schema/func_yaml-schema.json @@ -90,6 +90,13 @@ "serviceAccountName": { "type": "string", "description": "ServiceAccountName is the name of the service account used for the\nfunction pod. The service account must exist in the namespace to\nsucceed.\nMore info: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/" + }, + "subscriptions": { + "items": { + "$schema": "http://json-schema.org/draft-04/schema#", + "$ref": "#/definitions/KnativeSubscription" + }, + "type": "array" } }, "additionalProperties": false, @@ -219,6 +226,27 @@ "type": "object", "description": "HealthEndpoints specify the liveness and readiness endpoints for a Runtime" }, + "KnativeSubscription": { + "required": [ + "source" + ], + "properties": { + "source": { + "type": "string" + }, + "filters": { + "patternProperties": { + ".*": { + "type": "string" + } + }, + "type": "object" + } + }, + "additionalProperties": false, + "type": "object", + "description": "KnativeSubscription" + }, "Label": { "required": [ "key" From 1437cab3845ab520bc1f15d5ad2b79496bd33183 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 2 Nov 2023 09:42:44 +0100 Subject: [PATCH 18/21] Update function Signed-off-by: Matthias Wessendorf --- pkg/knative/deployer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 3959ccc470..d6a02ad8c6 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -231,7 +231,7 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } - err = createTriggers(ctx, f, err, client, eventingClient) + err = createTriggers(ctx, f, client, eventingClient) if err != nil { return fn.DeploymentResult{}, err } @@ -295,7 +295,7 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } - err = createTriggers(ctx, f, err, client, eventingClient) + err = createTriggers(ctx, f, client, eventingClient) if err != nil { return fn.DeploymentResult{}, err } @@ -308,7 +308,7 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } } -func createTriggers(ctx context.Context, f fn.Function, err error, client clientservingv1.KnServingClient, eventingClient clienteventingv1.KnEventingClient) error { +func createTriggers(ctx context.Context, f fn.Function, client clientservingv1.KnServingClient, eventingClient clienteventingv1.KnEventingClient) error { ksvc, err := client.GetService(ctx, f.Name) if err != nil { err = fmt.Errorf("knative deployer failed to get the Service for Trigger: %v", err) From 810a0032ee40681b8da4417b3dafabe135ecc0b1 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Thu, 2 Nov 2023 14:26:03 +0100 Subject: [PATCH 19/21] Little unit test Signed-off-by: Matthias Wessendorf --- cmd/subscribe_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 cmd/subscribe_test.go diff --git a/cmd/subscribe_test.go b/cmd/subscribe_test.go new file mode 100644 index 0000000000..835f647aac --- /dev/null +++ b/cmd/subscribe_test.go @@ -0,0 +1,73 @@ +package cmd + +import ( + "testing" + + fn "knative.dev/func/pkg/functions" +) + +func TestSubscribeWithAll(t *testing.T) { + root := fromTempDirectory(t) + + _, err := fn.New().Init(fn.Function{Runtime: "go", Root: root}) + if err != nil { + t.Fatal(err) + } + + cmd := NewSubscribeCmd() + cmd.SetArgs([]string{"--source", "my-broker", "--filter", "foo=go"}) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + + // Now load the function and ensure that the subscription is set correctly. + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + + if f.Deploy.Subscriptions == nil { + t.Fatal("Expected subscription to be present ") + } + if f.Deploy.Subscriptions[0].Source != "my-broker" { + t.Fatalf("Expected subscription for broker to be 'my-broker', but got '%v'", f.Deploy.Subscriptions[0].Source) + } + + if f.Deploy.Subscriptions[0].Filters["foo"] != "go" { + t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"]) + } +} + +func TestSubscribeWithNoExplicitSourceAll(t *testing.T) { + root := fromTempDirectory(t) + + _, err := fn.New().Init(fn.Function{Runtime: "go", Root: root}) + if err != nil { + t.Fatal(err) + } + + cmd := NewSubscribeCmd() + cmd.SetArgs([]string{"--filter", "foo=go"}) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + + // Now load the function and ensure that the subscription is set correctly. + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + + if f.Deploy.Subscriptions == nil { + t.Fatal("Expected subscription to be present ") + } + if f.Deploy.Subscriptions[0].Source != "default" { + t.Fatalf("Expected subscription for broker to be 'default', but got '%v'", f.Deploy.Subscriptions[0].Source) + } + + if f.Deploy.Subscriptions[0].Filters["foo"] != "go" { + t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"]) + } +} From 6dcacfc7feefc4b346f0600a1f28ffca10ce1e4b Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 6 Nov 2023 12:21:48 +0100 Subject: [PATCH 20/21] Adding a bit more help text Signed-off-by: Matthias Wessendorf --- cmd/subscribe.go | 21 +++++++++++++++++---- docs/reference/func.md | 2 +- docs/reference/func_subscribe.md | 22 ++++++++++++++++++++-- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index c0acede833..d9580e8529 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -11,10 +11,23 @@ import ( func NewSubscribeCmd() *cobra.Command { cmd := &cobra.Command{ - Use: "subscribe", - Short: "Subscribe to a function", - Long: `Subscribe to a function`, - SuggestFor: []string{"subscribe", "subscribe"}, + Use: "subscribe", + Short: "Subscribe a function to events", + Long: `Subscribe a function to events + +Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata +and a Knative Broker from where the events are consumed. +`, + Example: ` +# Subscribe the function to the 'default' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value + +# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker +`, + SuggestFor: []string{"subcsribe", "subsrcibe"}, PreRunE: bindEnv("filter", "source"), RunE: func(cmd *cobra.Command, args []string) error { return runSubscribe(cmd, args) diff --git a/docs/reference/func.md b/docs/reference/func.md index b5ae5f6056..c28c23b05e 100644 --- a/docs/reference/func.md +++ b/docs/reference/func.md @@ -36,7 +36,7 @@ Learn more about Knative at: https://knative.dev * [func list](func_list.md) - List deployed functions * [func repository](func_repository.md) - Manage installed template repositories * [func run](func_run.md) - Run the function locally -* [func subscribe](func_subscribe.md) - Subscribe to a function +* [func subscribe](func_subscribe.md) - Subscribe a function to events * [func templates](func_templates.md) - List available function source templates * [func version](func_version.md) - Function client version information diff --git a/docs/reference/func_subscribe.md b/docs/reference/func_subscribe.md index d6aea6e946..77e7aa9bb0 100644 --- a/docs/reference/func_subscribe.md +++ b/docs/reference/func_subscribe.md @@ -1,15 +1,33 @@ ## func subscribe -Subscribe to a function +Subscribe a function to events ### Synopsis -Subscribe to a function +Subscribe a function to events + +Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata +and a Knative Broker from where the events are consumed. + ``` func subscribe ``` +### Examples + +``` + +# Subscribe the function to the 'default' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +func subscribe --filter type=com.example --filter extension=my-extension-value + +# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +func subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker + +``` + ### Options ``` From c30a5c4130d045e8bbb9bfe3503cc350632e1cfb Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 6 Nov 2023 13:45:23 +0100 Subject: [PATCH 21/21] misspell instruction Signed-off-by: Matthias Wessendorf --- cmd/subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index d9580e8529..e3f29bc91f 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -27,7 +27,7 @@ and an 'extension' attribute for the value 'my-extension-value'. and an 'extension' attribute for the value 'my-extension-value'. {{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker `, - SuggestFor: []string{"subcsribe", "subsrcibe"}, + SuggestFor: []string{"subcsribe"}, //nolint:misspell PreRunE: bindEnv("filter", "source"), RunE: func(cmd *cobra.Command, args []string) error { return runSubscribe(cmd, args)