Skip to content

Commit

Permalink
Adding support for func subscribe for creating mutiple triggers, ba…
Browse files Browse the repository at this point in the history
…sed on event filters (#2001)

* Adding support for `func subscribe` for creating mutiple triggers, based on event filters

Signed-off-by: Matthias Wessendorf <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* Update cmd/subscribe.go

Co-authored-by: Luke Kingland <[email protected]>

* removing unused import

Signed-off-by: Matthias Wessendorf <[email protected]>

* running make

Signed-off-by: Matthias Wessendorf <[email protected]>

* Some import ogranization

Signed-off-by: Matthias Wessendorf <[email protected]>

* Change argument syntax

Signed-off-by: Matthias Wessendorf <[email protected]>

* changes

Signed-off-by: Matthias Wessendorf <[email protected]>

* Adding some emoji text

Signed-off-by: Matthias Wessendorf <[email protected]>

* 💄 move subscriptions underneath the deploy element

Signed-off-by: Matthias Wessendorf <[email protected]>

* adding silly emoji to build

Signed-off-by: Matthias Wessendorf <[email protected]>

* Adding some simple/copied/modified test

Signed-off-by: Matthias Wessendorf <[email protected]>

* Running 'make schema-generate'

Signed-off-by: Matthias Wessendorf <[email protected]>

* Update function

Signed-off-by: Matthias Wessendorf <[email protected]>

* Little unit test

Signed-off-by: Matthias Wessendorf <[email protected]>

* Adding a bit more help text

Signed-off-by: Matthias Wessendorf <[email protected]>

* misspell instruction

Signed-off-by: Matthias Wessendorf <[email protected]>

---------

Signed-off-by: Matthias Wessendorf <[email protected]>
Co-authored-by: Luke Kingland <[email protected]>
  • Loading branch information
matzew and lkingland authored Nov 6, 2023
1 parent d258a19 commit 5bb373a
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

# Default Targets
all: build docs
@echo '🎉 Build process completed!'

# Help Text
# Headings: lines with `##$` comment prefix
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Learn more about Knative at: https://knative.dev`, cfg.Name),
NewDeployCmd(newClient),
NewDeleteCmd(newClient),
NewListCmd(newClient),
NewSubscribeCmd(),
},
},
{
Expand Down
106 changes: 106 additions & 0 deletions cmd/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cmd

import (
"fmt"
"strings"

"github.com/ory/viper"
"github.com/spf13/cobra"
fn "knative.dev/func/pkg/functions"
)

func NewSubscribeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "subscribe",
Short: "Subscribe a function to events",
Long: `Subscribe a function to events
Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata
and a Knative Broker from where the events are consumed.
`,
Example: `
# Subscribe the function to the 'default' broker where events have 'type' of 'com.example'
and an 'extension' attribute for the value 'my-extension-value'.
{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value
# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example'
and an 'extension' attribute for the value 'my-extension-value'.
{{rootCmdUse}} subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker
`,
SuggestFor: []string{"subcsribe"}, //nolint:misspell
PreRunE: bindEnv("filter", "source"),
RunE: func(cmd *cobra.Command, args []string) error {
return runSubscribe(cmd, args)
},
}

cmd.Flags().StringArrayP("filter", "f", []string{}, "Filter for the Cloud Event metadata")

cmd.Flags().StringP("source", "s", "default", "The source, like a Knative Broker")

return cmd
}

func runSubscribe(cmd *cobra.Command, args []string) (err error) {
var (
cfg subscibeConfig
f fn.Function
)
cfg = newSubscribeConfig(cmd)

if f, err = fn.NewFunction(""); err != nil {
return
}
if !f.Initialized() {
return fn.NewErrNotInitialized(f.Root)
}
if !f.Initialized() {
return fn.NewErrNotInitialized(f.Root)
}

// add subscription to function
f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{
Source: cfg.Source,
Filters: extractFilterMap(cfg),
})

// pump it
return f.Write()

}

func extractFilterMap(cfg subscibeConfig) map[string]string {
subscriptionFilters := make(map[string]string)
for _, filter := range cfg.Filter {
kv := strings.Split(filter, "=")
if len(kv) != 2 {
fmt.Println("Invalid pair:", filter)
continue
}
key := kv[0]
value := kv[1]
subscriptionFilters[key] = value
}
return subscriptionFilters
}

type subscibeConfig struct {
Filter []string
Source string
}

func newSubscribeConfig(cmd *cobra.Command) (c subscibeConfig) {
c = subscibeConfig{
Filter: viper.GetStringSlice("filter"),
Source: viper.GetString("source"),
}
// NOTE: .Filter should be viper.GetStringSlice, but this returns unparsed
// results and appears to be an open issue since 2017:
// https://github.com/spf13/viper/issues/380
var err error
if c.Filter, err = cmd.Flags().GetStringArray("filter"); err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "error reading filter arguments: %v", err)
}

return
}
73 changes: 73 additions & 0 deletions cmd/subscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cmd

import (
"testing"

fn "knative.dev/func/pkg/functions"
)

func TestSubscribeWithAll(t *testing.T) {
root := fromTempDirectory(t)

_, err := fn.New().Init(fn.Function{Runtime: "go", Root: root})
if err != nil {
t.Fatal(err)
}

cmd := NewSubscribeCmd()
cmd.SetArgs([]string{"--source", "my-broker", "--filter", "foo=go"})

if err := cmd.Execute(); err != nil {
t.Fatal(err)
}

// Now load the function and ensure that the subscription is set correctly.
f, err := fn.NewFunction(root)
if err != nil {
t.Fatal(err)
}

if f.Deploy.Subscriptions == nil {
t.Fatal("Expected subscription to be present ")
}
if f.Deploy.Subscriptions[0].Source != "my-broker" {
t.Fatalf("Expected subscription for broker to be 'my-broker', but got '%v'", f.Deploy.Subscriptions[0].Source)
}

if f.Deploy.Subscriptions[0].Filters["foo"] != "go" {
t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"])
}
}

func TestSubscribeWithNoExplicitSourceAll(t *testing.T) {
root := fromTempDirectory(t)

_, err := fn.New().Init(fn.Function{Runtime: "go", Root: root})
if err != nil {
t.Fatal(err)
}

cmd := NewSubscribeCmd()
cmd.SetArgs([]string{"--filter", "foo=go"})

if err := cmd.Execute(); err != nil {
t.Fatal(err)
}

// Now load the function and ensure that the subscription is set correctly.
f, err := fn.NewFunction(root)
if err != nil {
t.Fatal(err)
}

if f.Deploy.Subscriptions == nil {
t.Fatal("Expected subscription to be present ")
}
if f.Deploy.Subscriptions[0].Source != "default" {
t.Fatalf("Expected subscription for broker to be 'default', but got '%v'", f.Deploy.Subscriptions[0].Source)
}

if f.Deploy.Subscriptions[0].Filters["foo"] != "go" {
t.Fatalf("Expected subscription filter for 'foo' to be 'go', but got '%v'", f.Deploy.Subscriptions[0].Filters["foo"])
}
}
1 change: 1 addition & 0 deletions docs/reference/func.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Learn more about Knative at: https://knative.dev
* [func list](func_list.md) - List deployed functions
* [func repository](func_repository.md) - Manage installed template repositories
* [func run](func_run.md) - Run the function locally
* [func subscribe](func_subscribe.md) - Subscribe a function to events
* [func templates](func_templates.md) - List available function source templates
* [func version](func_version.md) - Function client version information

42 changes: 42 additions & 0 deletions docs/reference/func_subscribe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
## func subscribe

Subscribe a function to events

### Synopsis

Subscribe a function to events

Subscribe the function to a set of events, matching a set of filters for Cloud Event metadata
and a Knative Broker from where the events are consumed.


```
func subscribe
```

### Examples

```
# Subscribe the function to the 'default' broker where events have 'type' of 'com.example'
and an 'extension' attribute for the value 'my-extension-value'.
func subscribe --filter type=com.example --filter extension=my-extension-value
# Subscribe the function to the 'my-broker' broker where events have 'type' of 'com.example'
and an 'extension' attribute for the value 'my-extension-value'.
func subscribe --filter type=com.example --filter extension=my-extension-value --source my-broker
```

### Options

```
-f, --filter stringArray Filter for the Cloud Event metadata
-h, --help help for subscribe
-s, --source string The source, like a Knative Broker (default "default")
```

### SEE ALSO

* [func](func.md) - func manages Knative Functions

25 changes: 25 additions & 0 deletions pkg/functions/client_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,31 @@ func TestDeployWithOptions(t *testing.T) {
defer del(t, client, "test-deploy-with-options")
}

func TestDeployWithTriggers(t *testing.T) {
root, cleanup := Mktemp(t)
defer cleanup()
verbose := true

f := fn.Function{Runtime: "go", Name: "test-deploy-with-triggers", Root: root}
f.Deploy = fn.DeploySpec{
Subscriptions: []fn.KnativeSubscription{
{
Source: "default",
Filters: map[string]string{
"key": "value",
"foo": "bar",
},
},
},
}

client := newClient(verbose)
if _, _, err := client.New(context.Background(), f); err != nil {
t.Fatal(err)
}
defer del(t, client, "test-deploy-with-triggers")
}

func TestUpdateWithAnnotationsAndLabels(t *testing.T) {
functionName := "updateannlab"
defer Within(t, "testdata/example.com/"+functionName)()
Expand Down
8 changes: 8 additions & 0 deletions pkg/functions/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type Function struct {
Deploy DeploySpec `yaml:"deploy,omitempty"`
}

// KnativeSubscription
type KnativeSubscription struct {
Source string `yaml:"source"`
Filters map[string]string `yaml:"filters,omitempty"`
}

// BuildSpec
type BuildSpec struct {
// Git stores information about an optionally associated git repository.
Expand Down Expand Up @@ -159,6 +165,8 @@ type DeploySpec struct {
// succeed.
// More info: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`

Subscriptions []KnativeSubscription `yaml:"subscriptions,omitempty"`
}

// HealthEndpoints specify the liveness and readiness endpoints for a Runtime
Expand Down
Loading

0 comments on commit 5bb373a

Please sign in to comment.