Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fallback] Set xDS configuration from the client #116

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/psm-interop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ jobs:
protos/grpc/testing/empty.proto
protos/grpc/testing/messages.proto
protos/grpc/testing/test.proto
protos/grpc/testing/xdsconfig/control.proto
protos/grpc/testing/xdsconfig/service.proto
protos/grpc/testing/xdsconfig/xdsconfig.proto

- name: "Run unit tests"
run: python -m tests.unit
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@ venv/
venv-*/
out/
protos/**/*_pb2*
# Generated Go files
docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig/*.pb.go
10 changes: 10 additions & 0 deletions docker/go-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ the root of `grpc/psm-interop` checkout.
```
docker build . -f docker/go-control-plane/Dockerfile
```

## Local development

Run the following command from this repository to generate code from the .proto
files:
```
protoc -I=. --go_out=docker/go-control-plane \
protos/grpc/testing/xdsconfig/*.proto \
--go-grpc_out=docker/go-control-plane/
```
124 changes: 0 additions & 124 deletions docker/go-control-plane/controlplane/resource.go

This file was deleted.

120 changes: 40 additions & 80 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@ import (
"log"
"net"
"os"
"strconv"
"sync"

"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/grpc/psm-interop/docker/go-control-plane/controlplane"
xdsconfigpb "github.com/grpc/psm-interop/docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand All @@ -52,14 +48,13 @@ import (
)

var (
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
upstream = flag.String("upstream", "localhost:3000", "upstream server")
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
)

type Filter struct {
ResourceType string
ResourceName string
type resourceKey struct {
resourceType string
resourceName string
}

// controlService provides a gRPC API to configure test-specific control plane
Expand All @@ -68,8 +63,7 @@ type controlService struct {
xdsconfigpb.UnsafeXdsConfigControlServiceServer
version uint32
mu sync.Mutex // Guards access to all fields listed below
clusters map[string]*v3clusterpb.Cluster
listeners map[string]*v3listenerpb.Listener
resources map[resourceKey]*proto.Message
filters map[string]map[string]bool
cache cache.SnapshotCache
}
Expand All @@ -79,52 +73,55 @@ type controlService struct {
func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.StopOnRequestRequest) (*xdsconfigpb.StopOnRequestResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
if val, ok := srv.filters[req.GetResourceType()]; ok {
val[req.GetResourceName()] = true
if val, ok := srv.filters[req.TypeUrl]; ok {
val[req.Name] = true
} else {
srv.filters[req.GetResourceType()] = map[string]bool{req.GetResourceName(): true}
srv.filters[req.TypeUrl] = map[string]bool{req.Name: true}
}
res := xdsconfigpb.StopOnRequestResponse{}
for t, names := range srv.filters {
for name, _ := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
for name := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{TypeUrl: t, Name: name})
}
}
return &res, nil
}

// UpsertResources allows the test to provide a new or replace existing xDS
// resource. Notification will be sent to any control plane clients watching
// SetResources allows the test to provide a new or replace existing xDS
// resources. Notification will be sent to any control plane clients watching
// the resource being updated.
func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.UpsertResourcesRequest) (*xdsconfigpb.UpsertResourcesResponse, error) {
func (srv *controlService) SetResources(_ context.Context, req *xdsconfigpb.SetResourcesRequest) (*xdsconfigpb.SetResourcesResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.version++
listener := controlplane.ListenerName
if req.Listener != nil {
listener = *req.Listener
if len(req.Resources) > 0 {
srv.version++
}
for _, resource := range req.Resources {
key := resourceKey{resourceType: resource.TypeUrl, resourceName: resource.Name}
contents := resource.Body
eugeneo marked this conversation as resolved.
Show resolved Hide resolved
if contents == nil {
delete(srv.resources, key)
continue
}
body, err := contents.UnmarshalNew()
if err != nil {
log.Printf("Failed to parse %s/%s: %v", key.resourceType, key.resourceName, err)
continue
}
srv.resources[key] = &body
}
srv.clusters[req.Cluster] = controlplane.MakeCluster(req.Cluster, req.UpstreamHost, req.UpstreamPort)
srv.listeners[listener] = controlplane.MakeHTTPListener(listener, req.Cluster)
if err := srv.RefreshSnapshot(); err != nil {
return nil, err
}
res := &xdsconfigpb.UpsertResourcesResponse{}
for _, l := range srv.listeners {
a, err := anypb.New(l)
res := xdsconfigpb.SetResourcesResponse{}
for key, message := range srv.resources {
a, err := anypb.New(*message)
if err != nil {
log.Fatalf("Failed to convert listener %v to pb: %v\n", l, err)
log.Printf("Can not wrap resource %s/%s into any: %v", key.resourceType, key.resourceName, err)
}
res.Resource = append(res.Resource, a)
}
for _, c := range srv.clusters {
a, err := anypb.New(c)
if err != nil {
log.Fatalf("Failed to convert cluster %v to pb: %v\n", c, err)
}
res.Resource = append(res.Resource, a)
}
return res, nil
return &res, nil
}

// Abruptly stops the server when the client requests a resource that the test
Expand All @@ -146,31 +143,15 @@ func (srv *controlService) onStreamRequest(id int64, req *v3discoverypb.Discover
}

func (srv *controlService) RefreshSnapshot() error {
var listeners []types.Resource
for _, l := range srv.listeners {
listeners = append(listeners, l)
}
var clusters []types.Resource
for _, c := range srv.clusters {
clusters = append(clusters, c)
resources := map[resource.Type][]types.Resource{}
for k, resource := range srv.resources {
resources[k.resourceType] = append(resources[k.resourceType], *resource)
}
resources := map[resource.Type][]types.Resource{resource.ListenerType: listeners, resource.ClusterType: clusters}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(fmt.Sprint(srv.version), resources)
if err != nil {
return err
}
log.Printf("Snapshot contents:\n")
for _, values := range snapshot.Resources {
for name, item := range values.Items {
text, err := protojson.MarshalOptions{Multiline: true}.Marshal(item.Resource)
if err != nil {
log.Printf("Resource %v, error: %v\n", name, err)
continue
}
log.Printf("%v => %v\n", name, string(text))
}
}
if err := snapshot.Consistent(); err != nil {
log.Printf("Snapshot inconsistency: %v\n", err)
return err
Expand All @@ -183,7 +164,6 @@ func (srv *controlService) RefreshSnapshot() error {
return nil
}


func (srv *controlService) RunServer(port uint) error {
if err := srv.RefreshSnapshot(); err != nil {
log.Fatalf("Failed to refresh snapshot: %v\n", err)
Expand All @@ -208,32 +188,12 @@ func (srv *controlService) RunServer(port uint) error {
return nil
}

func parseHostPort(host_port string) (string, uint32, error) {
host, upstreamPort, err := net.SplitHostPort(*upstream)
if err != nil {
return "", 0, fmt.Errorf("Incorrect upstream host name: %s: %v\n", host_port, err)
}
parsedUpstreamPort, err := strconv.Atoi(upstreamPort)
if err != nil || parsedUpstreamPort <= 0 {
return "", 0, fmt.Errorf("Not a valid port number: %d: %v\n", upstreamPort, err)
}
return host, uint32(parsedUpstreamPort), nil
}


// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
flag.Parse()
host, upstreamPort, err := parseHostPort(*upstream)
if err != nil {
log.Fatalf("Incorrect upstream host name: %s: %v\n", upstream, err)
}
initial_cds := controlplane.MakeCluster(controlplane.ClusterName, host, upstreamPort)
initial_lds := controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)
controlService := &controlService{version: 1,
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: initial_cds},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: initial_lds},
resources: map[resourceKey]*proto.Message{},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
Expand Down
Loading
Loading