Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Aug 7, 2024
1 parent 166fd13 commit 3c77744
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 55 deletions.
40 changes: 10 additions & 30 deletions framework/helpers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
from docker import client
from docker import errors
from docker import types
import framework.helpers.xds_resources
from protos.grpc.testing import messages_pb2
from protos.grpc.testing import test_pb2_grpc
from protos.grpc.testing.xdsconfig import control_pb2
from protos.grpc.testing.xdsconfig import service_pb2_grpc
from protos.grpc.testing.xdsconfig import xdsconfig_pb2
from protos.grpc.testing.xdsconfig import xdsconfig_pb2_grpc

# bootstrap.json template
BOOTSTRAP_JSON_TEMPLATE = "templates/bootstrap.json"
Expand Down Expand Up @@ -272,9 +271,7 @@ def __init__(
manager: ProcessManager,
name: str,
port: int,
listener_name: str,
upstream_host: str,
upstream_port: int,
initial_resources: xdsconfig_pb2.SetResourcesRequest,
image: str,
):
super().__init__(
Expand All @@ -285,43 +282,26 @@ def __init__(
ports={DEFAULT_CONTROL_PLANE_PORT: port},
command=["--nodeid", manager.node_id],
)
self.listener_name = listener_name
self.upstream_host = upstream_host
self.upstream_port = upstream_port
self.initial_resources = initial_resources

def __enter__(self):
if not super().__enter__():
return None
self.update_resources(
listener_name=self.listener_name,
cluster_name="main_cluster",
upstream_host=self.upstream_host,
upstream_port=self.upstream_port,
)
self.update_resources(self.initial_resources)
return self

def stop_on_resource_request(self, resource_type: str, resource_name: str):
stub = service_pb2_grpc.XdsConfigControlServiceStub(self.channel())
stub = xdsconfig_pb2_grpc.XdsConfigControlServiceStub(self.channel())
res = stub.StopOnRequest(
control_pb2.StopOnRequestRequest(
xdsconfig_pb2.StopOnRequestRequest(
resource_type=resource_type, resource_name=resource_name
)
)
return res

def update_resources(
self,
listener_name: str,
cluster_name: str,
upstream_port: int,
upstream_host="localhost",
):
stub = service_pb2_grpc.XdsConfigControlServiceStub(self.channel())
return stub.SetResources(
framework.helpers.xds_resources.build_set_resource_request(
listener_name, cluster_name, upstream_host, upstream_port
)
)
def update_resources(self, resources: xdsconfig_pb2.SetResourcesRequest):
stub = xdsconfig_pb2_grpc.XdsConfigControlServiceStub(self.channel())
return stub.SetResources(resources)


class Client(GrpcProcess):
Expand Down
29 changes: 13 additions & 16 deletions framework/helpers/xds_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from google.protobuf import any_pb2
from google.protobuf import message

from protos.grpc.testing.xdsconfig import service_pb2
from protos.grpc.testing.xdsconfig import xdsconfig_pb2


def _wrap_in_any(msg: message.Message) -> any_pb2.Any:
Expand All @@ -36,7 +36,7 @@ def _wrap_in_any(msg: message.Message) -> any_pb2.Any:
return any_msg


def build_listener(listener_name: str, cluster_name: str):
def _build_listener(listener_name: str, cluster_name: str):
hcm = http_connection_manager_pb2.HttpConnectionManager(
route_config=route_pb2.RouteConfiguration(
virtual_hosts=[
Expand Down Expand Up @@ -67,7 +67,7 @@ def build_listener(listener_name: str, cluster_name: str):
)


def build_endpoint(
def _build_endpoint(
cluster_name: str, upstream_host: str, upstream_port: int
) -> endpoint_pb2.ClusterLoadAssignment:
return endpoint_pb2.ClusterLoadAssignment(
Expand All @@ -92,12 +92,12 @@ def build_endpoint(
)


def build_cluster(
def _build_cluster(
cluster_name: str, upstream_host: str, upstream_port: int
) -> cluster_pb2.Cluster:
return cluster_pb2.Cluster(
name=cluster_name,
load_assignment=build_endpoint(
load_assignment=_build_endpoint(
cluster_name, upstream_host, upstream_port
),
type=cluster_pb2.Cluster.DiscoveryType.LOGICAL_DNS,
Expand All @@ -106,25 +106,22 @@ def build_cluster(
)


def _build_resource_to_set_message(resource: message.Message):
return service_pb2.SetResourceRequest.ResourceToSet(
def _build_resource_to_set(resource: message.Message):
return xdsconfig_pb2.SetResourcesRequest.ResourceToSet(
type=f"type.googleapis.com/{resource.DESCRIPTOR.full_name}",
name=resource.name,
body=_wrap_in_any(resource),
)


def build_set_resource_request(
def build_listener_and_cluster(
listener_name: str,
cluster_name: str,
upstream_host: str,
upstream_port: int,
) -> service_pb2.SetResourceRequest:
listener = build_listener(listener_name, cluster_name)
cluster = build_cluster(cluster_name, upstream_host, upstream_port)
return service_pb2.SetResourceRequest(
resources=[
_build_resource_to_set_message(listener),
_build_resource_to_set_message(cluster),
]
) -> xdsconfig_pb2.SetResourcesRequest:
listener = _build_listener(listener_name, cluster_name)
cluster = _build_cluster(cluster_name, upstream_host, upstream_port)
return xdsconfig_pb2.SetResourcesRequest(
resources=[_build_resource_to_set(r) for r in [listener, cluster]]
)
4 changes: 2 additions & 2 deletions protos/grpc/testing/xdsconfig/xdsconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ message StopOnRequestResponse {
};

// Request to set one or more resources.
message SetResourceRequest {
message SetResourcesRequest {
message ResourceToSet {
// Resource type
string type = 1;
Expand All @@ -62,5 +62,5 @@ service XdsConfigControlService {
// Instructs the server to exit when given resource is requested
rpc StopOnRequest(StopOnRequestRequest) returns (StopOnRequestResponse);
// A generic way to set xDS resources
rpc SetResources(SetResourceRequest) returns (SetResourcesResponse);
rpc SetResources(SetResourcesRequest) returns (SetResourcesResponse);
};
20 changes: 13 additions & 7 deletions tests/fallback_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import framework
import framework.helpers.docker
import framework.helpers.logs
import framework.helpers.retryers
import framework.helpers.xds_resources
import framework.xds_flags
import framework.xds_k8s_testcase
Expand Down Expand Up @@ -83,11 +84,14 @@ def start_control_plane(self, name: str, index: int, upstream_port: int):
logger.debug('Starting control plane "%s"', name)
return framework.helpers.docker.ControlPlane(
self.process_manager,
listener_name=_LISTENER,
name=name,
port=self.bootstrap.xds_config_server_port(index),
upstream_host=_HOST_NAME.value,
upstream_port=upstream_port,
initial_resources=framework.helpers.xds_resources.build_listener_and_cluster(
listener_name=_LISTENER,
cluster_name="initial_cluster",
upstream_host=_HOST_NAME.value,
upstream_port=upstream_port,
),
image=_CONTROL_PLANE_IMAGE.value,
)

Expand Down Expand Up @@ -206,10 +210,12 @@ def test_fallback_mid_update(self):
"test_cluster_2",
)
primary.update_resources(
cluster_name="test_cluster_2",
listener_name=_LISTENER,
upstream_port=server3.port,
upstream_host=_HOST_NAME.value,
framework.helpers.xds_resources.build_listener_and_cluster(
cluster_name="test_cluster_2",
listener_name=_LISTENER,
upstream_port=server3.port,
upstream_host=_HOST_NAME.value,
)
)
stats = client.get_stats(10)
self.assertEqual(stats.num_failures, 0)
Expand Down

0 comments on commit 3c77744

Please sign in to comment.