diff --git a/Makefile b/Makefile index c786f704c6..858c25fe4c 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ CODE := $(shell find . -name '*.go') \ # Default Targets all: build docs + @echo '🎉 Build process completed!' # Help Text # Headings: lines with `##$` comment prefix diff --git a/cmd/root.go b/cmd/root.go index 2c143e67c2..800f3eafce 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -73,6 +73,7 @@ Learn more about Knative at: https://knative.dev`, cfg.Name), NewDeployCmd(newClient), NewDeleteCmd(newClient), NewListCmd(newClient), + NewSubscribeCmd(), }, }, { diff --git a/cmd/subscribe.go b/cmd/subscribe.go new file mode 100644 index 0000000000..e3f29bc91f --- /dev/null +++ b/cmd/subscribe.go @@ -0,0 +1,106 @@ +package cmd + +import ( + "fmt" + "strings" + + "github.com/ory/viper" + "github.com/spf13/cobra" + fn "knative.dev/func/pkg/functions" +) + +func NewSubscribeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "subscribe", + Short: "Subscribe a function to events", + Long: `Subscribe a function to events + +Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata +and a Knative Broker from where the events are consumed. +`, + Example: ` +# Subscribe the function to the 'default' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value + +# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker +`, + SuggestFor: []string{"subcsribe"}, //nolint:misspell + PreRunE: bindEnv("filter", "source"), + RunE: func(cmd *cobra.Command, args []string) error { + return runSubscribe(cmd, args) + }, + } + + cmd.Flags().StringArrayP("filter", "f", []string{}, "Filter for the Cloud Event metadata") + + cmd.Flags().StringP("source", "s", "default", "The source, like a Knative Broker") + + return cmd +} + +func runSubscribe(cmd *cobra.Command, args []string) (err error) { + var ( + cfg subscibeConfig + f fn.Function + ) + cfg = newSubscribeConfig(cmd) + + if f, err = fn.NewFunction(""); err != nil { + return + } + if !f.Initialized() { + return fn.NewErrNotInitialized(f.Root) + } + if !f.Initialized() { + return fn.NewErrNotInitialized(f.Root) + } + + // add subscription to function + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: cfg.Source, + Filters: extractFilterMap(cfg), + }) + + // pump it + return f.Write() + +} + +func extractFilterMap(cfg subscibeConfig) map[string]string { + subscriptionFilters := make(map[string]string) + for _, filter := range cfg.Filter { + kv := strings.Split(filter, "=") + if len(kv) != 2 { + fmt.Println("Invalid pair:", filter) + continue + } + key := kv[0] + value := kv[1] + subscriptionFilters[key] = value + } + return subscriptionFilters +} + +type subscibeConfig struct { + Filter []string + Source string +} + +func newSubscribeConfig(cmd *cobra.Command) (c subscibeConfig) { + c = subscibeConfig{ + Filter: viper.GetStringSlice("filter"), + Source: viper.GetString("source"), + } + // NOTE: .Filter should be viper.GetStringSlice, but this returns unparsed + // results and appears to be an open issue since 2017: + // https://github.com/spf13/viper/issues/380 + var err error + if c.Filter, err = cmd.Flags().GetStringArray("filter"); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "error reading filter arguments: %v", err) + } + + return +} diff --git a/cmd/subscribe_test.go b/cmd/subscribe_test.go new file mode 100644 index 0000000000..835f647aac --- /dev/null +++ b/cmd/subscribe_test.go @@ -0,0 +1,73 @@ +package cmd + +import ( + "testing" + + fn "knative.dev/func/pkg/functions" +) + +func TestSubscribeWithAll(t *testing.T) { + root := fromTempDirectory(t) + + _, err := fn.New().Init(fn.Function{Runtime: "go", Root: root}) + if err != nil { + t.Fatal(err) + } + + cmd := NewSubscribeCmd() + cmd.SetArgs([]string{"--source", "my-broker", "--filter", "foo=go"}) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + + // Now load the function and ensure that the subscription is set correctly. + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + + if f.Deploy.Subscriptions == nil { + t.Fatal("Expected subscription to be present ") + } + if f.Deploy.Subscriptions[0].Source != "my-broker" { + t.Fatalf("Expected subscription for broker to be 'my-broker', but got '%v'", f.Deploy.Subscriptions[0].Source) + } + + if f.Deploy.Subscriptions[0].Filters["foo"] != "go" { + t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"]) + } +} + +func TestSubscribeWithNoExplicitSourceAll(t *testing.T) { + root := fromTempDirectory(t) + + _, err := fn.New().Init(fn.Function{Runtime: "go", Root: root}) + if err != nil { + t.Fatal(err) + } + + cmd := NewSubscribeCmd() + cmd.SetArgs([]string{"--filter", "foo=go"}) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + + // Now load the function and ensure that the subscription is set correctly. + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + + if f.Deploy.Subscriptions == nil { + t.Fatal("Expected subscription to be present ") + } + if f.Deploy.Subscriptions[0].Source != "default" { + t.Fatalf("Expected subscription for broker to be 'default', but got '%v'", f.Deploy.Subscriptions[0].Source) + } + + if f.Deploy.Subscriptions[0].Filters["foo"] != "go" { + t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"]) + } +} diff --git a/docs/reference/func.md b/docs/reference/func.md index b0267a209e..c28c23b05e 100644 --- a/docs/reference/func.md +++ b/docs/reference/func.md @@ -36,6 +36,7 @@ Learn more about Knative at: https://knative.dev * [func list](func_list.md) - List deployed functions * [func repository](func_repository.md) - Manage installed template repositories * [func run](func_run.md) - Run the function locally +* [func subscribe](func_subscribe.md) - Subscribe a function to events * [func templates](func_templates.md) - List available function source templates * [func version](func_version.md) - Function client version information diff --git a/docs/reference/func_subscribe.md b/docs/reference/func_subscribe.md new file mode 100644 index 0000000000..77e7aa9bb0 --- /dev/null +++ b/docs/reference/func_subscribe.md @@ -0,0 +1,42 @@ +## func subscribe + +Subscribe a function to events + +### Synopsis + +Subscribe a function to events + +Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata +and a Knative Broker from where the events are consumed. + + +``` +func subscribe +``` + +### Examples + +``` + +# Subscribe the function to the 'default' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +func subscribe --filter type=com.example --filter extension=my-extension-value + +# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example' +and an 'extension' attribute for the value 'my-extension-value'. +func subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker + +``` + +### Options + +``` + -f, --filter stringArray Filter for the Cloud Event metadata + -h, --help help for subscribe + -s, --source string The source, like a Knative Broker (default "default") +``` + +### SEE ALSO + +* [func](func.md) - func manages Knative Functions + diff --git a/pkg/functions/client_int_test.go b/pkg/functions/client_int_test.go index 7517ca4854..b1737e2dba 100644 --- a/pkg/functions/client_int_test.go +++ b/pkg/functions/client_int_test.go @@ -159,6 +159,31 @@ func TestDeployWithOptions(t *testing.T) { defer del(t, client, "test-deploy-with-options") } +func TestDeployWithTriggers(t *testing.T) { + root, cleanup := Mktemp(t) + defer cleanup() + verbose := true + + f := fn.Function{Runtime: "go", Name: "test-deploy-with-triggers", Root: root} + f.Deploy = fn.DeploySpec{ + Subscriptions: []fn.KnativeSubscription{ + { + Source: "default", + Filters: map[string]string{ + "key": "value", + "foo": "bar", + }, + }, + }, + } + + client := newClient(verbose) + if _, _, err := client.New(context.Background(), f); err != nil { + t.Fatal(err) + } + defer del(t, client, "test-deploy-with-triggers") +} + func TestUpdateWithAnnotationsAndLabels(t *testing.T) { functionName := "updateannlab" defer Within(t, "testdata/example.com/"+functionName)() diff --git a/pkg/functions/function.go b/pkg/functions/function.go index d7a09a7304..09161d916e 100644 --- a/pkg/functions/function.go +++ b/pkg/functions/function.go @@ -89,6 +89,12 @@ type Function struct { Deploy DeploySpec `yaml:"deploy,omitempty"` } +// KnativeSubscription +type KnativeSubscription struct { + Source string `yaml:"source"` + Filters map[string]string `yaml:"filters,omitempty"` +} + // BuildSpec type BuildSpec struct { // Git stores information about an optionally associated git repository. @@ -159,6 +165,8 @@ type DeploySpec struct { // succeed. // More info: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/ ServiceAccountName string `yaml:"serviceAccountName,omitempty"` + + Subscriptions []KnativeSubscription `yaml:"subscriptions,omitempty"` } // HealthEndpoints specify the liveness and readiness endpoints for a Runtime diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 9e2dfd3f3b..5089f30084 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -9,6 +9,10 @@ import ( "strings" "time" + clienteventingv1 "knative.dev/client-pkg/pkg/eventing/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -129,6 +133,10 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu if err != nil { return fn.DeploymentResult{}, err } + eventingClient, err := NewEventingClient(d.Namespace) + if err != nil { + return fn.DeploymentResult{}, err + } var outBuff SynchronizedBuffer var out io.Writer = &outBuff @@ -223,6 +231,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + if d.verbose { fmt.Printf("Function deployed in namespace %q and exposed at URL:\n%s\n", d.Namespace, route.Status.URL.String()) } @@ -282,6 +295,11 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, err } + err = createTriggers(ctx, f, client, eventingClient) + if err != nil { + return fn.DeploymentResult{}, err + } + return fn.DeploymentResult{ Status: fn.Updated, URL: route.Status.URL.String(), @@ -290,6 +308,57 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } } +func createTriggers(ctx context.Context, f fn.Function, client clientservingv1.KnServingClient, eventingClient clienteventingv1.KnEventingClient) error { + ksvc, err := client.GetService(ctx, f.Name) + if err != nil { + err = fmt.Errorf("knative deployer failed to get the Service for Trigger: %v", err) + return err + } + + fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") + + for i, sub := range f.Deploy.Subscriptions { + // create the filter: + attributes := make(map[string]string) + for key, value := range sub.Filters { + attributes[key] = value + } + + err = eventingClient.CreateTrigger(ctx, &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-function-trigger-%d", ksvc.Name, i), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.GetName(), + UID: ksvc.GetUID(), + }, + }, + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: ksvc.APIVersion, + Kind: ksvc.Kind, + Name: ksvc.Name, + }}, + + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + }) + if err != nil && !errors.IsAlreadyExists(err) { + err = fmt.Errorf("knative deployer failed to create the Trigger: %v", err) + return err + } + } + return nil +} + func probeFor(url string) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ diff --git a/schema/func_yaml-schema.json b/schema/func_yaml-schema.json index 8d9924a158..395d773a79 100644 --- a/schema/func_yaml-schema.json +++ b/schema/func_yaml-schema.json @@ -90,6 +90,13 @@ "serviceAccountName": { "type": "string", "description": "ServiceAccountName is the name of the service account used for the\nfunction pod. The service account must exist in the namespace to\nsucceed.\nMore info: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/" + }, + "subscriptions": { + "items": { + "$schema": "http://json-schema.org/draft-04/schema#", + "$ref": "#/definitions/KnativeSubscription" + }, + "type": "array" } }, "additionalProperties": false, @@ -215,6 +222,27 @@ "type": "object", "description": "HealthEndpoints specify the liveness and readiness endpoints for a Runtime" }, + "KnativeSubscription": { + "required": [ + "source" + ], + "properties": { + "source": { + "type": "string" + }, + "filters": { + "patternProperties": { + ".*": { + "type": "string" + } + }, + "type": "object" + } + }, + "additionalProperties": false, + "type": "object", + "description": "KnativeSubscription" + }, "Label": { "required": [ "key"