diff --git a/.github/workflows/psm-interop.yaml b/.github/workflows/psm-interop.yaml index 3df1fda8..35639ca4 100644 --- a/.github/workflows/psm-interop.yaml +++ b/.github/workflows/psm-interop.yaml @@ -53,8 +53,7 @@ jobs: protos/grpc/testing/empty.proto protos/grpc/testing/messages.proto protos/grpc/testing/test.proto - protos/grpc/testing/xdsconfig/control.proto - protos/grpc/testing/xdsconfig/service.proto + protos/grpc/testing/xdsconfig/xdsconfig.proto - name: "Run unit tests" run: python -m tests.unit diff --git a/framework/helpers/docker.py b/framework/helpers/docker.py index a5be59a6..f12a0c41 100644 --- a/framework/helpers/docker.py +++ b/framework/helpers/docker.py @@ -22,20 +22,16 @@ from docker.client import DockerClient from docker.errors import NotFound from docker.types import ContainerConfig - +from google.protobuf import message import grpc import mako.template import framework.helpers.xds_resources - from protos.grpc.testing import messages_pb2 from protos.grpc.testing import test_pb2_grpc from protos.grpc.testing.xdsconfig import control_pb2 -from protos.grpc.testing.xdsconfig import service_pb2_grpc from protos.grpc.testing.xdsconfig import service_pb2 - -from google.protobuf import any_pb2 -from google.protobuf import message +from protos.grpc.testing.xdsconfig import service_pb2_grpc # bootstrap.json template BOOTSTRAP_JSON_TEMPLATE = "templates/bootstrap.json" @@ -109,9 +105,7 @@ def __init__( def next_event(self, timeout: int) -> ChildProcessEvent: event: ChildProcessEvent = self.queue.get(timeout=timeout) - source = event.source - message = event.data - self.outputs[source].append(message) + self.outputs[event.source].append(event.data) return event def expect_output( @@ -151,8 +145,8 @@ def expect_output( return True return False - def on_message(self, source: str, message: str): - self.queue.put(ChildProcessEvent(source, message)) + def on_message(self, source: str, msg: str): + self.queue.put(ChildProcessEvent(source, msg)) def _Sanitize(l: str) -> str: @@ -228,9 +222,9 @@ def log_reader_loop(self): s = str(prefix + log.decode("utf-8")) prefix = "" if s[-1] == "\n" else s[s.rfind("\n") :] for l in s[: s.rfind("\n")].splitlines(): - message = _Sanitize(l) - logger.info("[%s] %s", self.name, message) - self.manager.on_message(self.name, message) + sanitized = _Sanitize(l) + logger.info("[%s] %s", self.name, sanitized) + self.manager.on_message(self.name, sanitized) class GrpcProcess: @@ -280,7 +274,6 @@ def channel(self) -> grpc.Channel: class ControlPlane(GrpcProcess): - def __init__( self, manager: ProcessManager, @@ -308,7 +301,7 @@ def __enter__(self): return None self.update_resources( listener_name=self.listener_name, - cluster="main_cluster", + cluster_name="main_cluster", upstream_host=self.upstream_host, upstream_port=self.upstream_port, ) @@ -331,7 +324,7 @@ def update_resources( upstream_host="localhost", ): listener = framework.helpers.xds_resources.build_listener( - listener_name=listener_name, cluster_name=cluster + listener_name=listener_name, cluster_name=cluster_name ) cluster = framework.helpers.xds_resources.build_cluster( cluster_name=cluster_name, @@ -348,15 +341,6 @@ def update_resources( ) ) - def expect_running(self, timeout_s: int = 5): - return self.expect_message_in_output( - "Management server listening on", timeout_s - ) - - def set_resources(self, resources): - print([r.DebugString() for r in resources].join("\n\n")) - raise Exception("Stop!") - class Client(GrpcProcess): def __init__( diff --git a/framework/helpers/xds_resources.py b/framework/helpers/xds_resources.py index 0ee12734..d56c6b25 100644 --- a/framework/helpers/xds_resources.py +++ b/framework/helpers/xds_resources.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from envoy.config.core.v3 import address_pb2 from envoy.config.cluster.v3 import cluster_pb2 +from envoy.config.core.v3 import address_pb2 from envoy.config.endpoint.v3 import endpoint_components_pb2 from envoy.config.endpoint.v3 import endpoint_pb2 from envoy.config.listener.v3 import api_listener_pb2 diff --git a/tests/fallback_test.py b/tests/fallback_test.py index 47c5f3c0..b1c753e4 100644 --- a/tests/fallback_test.py +++ b/tests/fallback_test.py @@ -18,7 +18,6 @@ import absl from absl import flags from absl.testing import absltest -from google.protobuf.json_format import MessageToJson import framework import framework.helpers.docker @@ -41,6 +40,8 @@ ) _NODE_ID = flags.DEFINE_string("node", "test-id", "Node ID") +_LISTENER = "listener_0" + absl.flags.adopt_module_key_flags(framework.xds_k8s_testcase) @@ -74,7 +75,7 @@ def start_client(self, port: int = None, name: str = None): manager=self.process_manager, name=name or framework.xds_flags.CLIENT_NAME.value, port=port or get_free_port(), - url="xds:///listener_0", + url=f"xds:///{_LISTENER}", image=framework.xds_k8s_flags.CLIENT_IMAGE.value, ) @@ -82,9 +83,11 @@ def start_control_plane(self, name: str, index: int, upstream_port: int): logger.debug('Starting control plane "%s"', name) return framework.helpers.docker.ControlPlane( self.process_manager, + listener_name=_LISTENER, name=name, port=self.bootstrap.xds_config_server_port(index), - upstream=f"{_HOST_NAME.value}:{upstream_port}", + upstream_host=_HOST_NAME.value, + upstream_port=upstream_port, image=_CONTROL_PLANE_IMAGE.value, ) @@ -100,29 +103,6 @@ def start_server(self, name: str, port: int = None): command=[], ) - def test_dummy_to_see(self): - print( - MessageToJson( - framework.helpers.docker.make_resource_to_set_message( - framework.helpers.xds_resources.build_cluster( - cluster_name="cluster_cluster", - upstream_host="hostname", - upstream_port=1111, - ) - ) - ) - ) - print( - MessageToJson( - framework.helpers.docker.make_resource_to_set_message( - framework.helpers.xds_resources.build_listener( - listener_name="listener0", - cluster_name="cluster_cluster", - ) - ) - ) - ) - @unittest.skip("Ignore for now") def test_fallback_on_startup(self): with ( @@ -178,9 +158,6 @@ def test_fallback_mid_startup(self): ) as primary, self.start_control_plane("fallback_xds_config", 1, server2.port), ): - # Wait for control plane to start up, stop when the client asks for - # a cluster from the primary server - self.assertTrue(primary.expect_running()) primary.stop_on_resource_request( "type.googleapis.com/envoy.config.cluster.v3.Cluster", "example_proxy_cluster", @@ -199,7 +176,6 @@ def test_fallback_mid_startup(self): with self.start_control_plane( "primary_xds_config_run_2", 0, server1.port ): - self.assertTrue(primary.expect_running()) stats = client.get_stats(10) self.assertEqual(stats.num_failures, 0) self.assertIn("server1", stats.rpcs_by_peer) @@ -228,7 +204,8 @@ def test_fallback_mid_update(self): "test_cluster_2", ) primary.update_resources( - cluster="test_cluster_2", + cluster_name="test_cluster_2", + listener_name=_LISTENER, upstream_port=server3.port, upstream_host=_HOST_NAME.value, ) @@ -240,8 +217,7 @@ def test_fallback_mid_update(self): name="primary_xds_config_run_2", index=0, upstream_port=server3.port, - ) as primary2: - self.assertTrue(primary2.expect_running()) + ): stats = client.get_stats(20) self.assertEqual(stats.num_failures, 0) self.assertIn("server3", stats.rpcs_by_peer)