Skip to content

Commit

Permalink
linters
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Aug 6, 2024
1 parent e4133c1 commit abf4c4a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 62 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/psm-interop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ jobs:
protos/grpc/testing/empty.proto
protos/grpc/testing/messages.proto
protos/grpc/testing/test.proto
protos/grpc/testing/xdsconfig/control.proto
protos/grpc/testing/xdsconfig/service.proto
protos/grpc/testing/xdsconfig/xdsconfig.proto
- name: "Run unit tests"
run: python -m tests.unit
Expand Down
36 changes: 10 additions & 26 deletions framework/helpers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@
from docker.client import DockerClient
from docker.errors import NotFound
from docker.types import ContainerConfig

from google.protobuf import message
import grpc
import mako.template

import framework.helpers.xds_resources

from protos.grpc.testing import messages_pb2
from protos.grpc.testing import test_pb2_grpc
from protos.grpc.testing.xdsconfig import control_pb2
from protos.grpc.testing.xdsconfig import service_pb2_grpc
from protos.grpc.testing.xdsconfig import service_pb2

from google.protobuf import any_pb2
from google.protobuf import message
from protos.grpc.testing.xdsconfig import service_pb2_grpc

# bootstrap.json template
BOOTSTRAP_JSON_TEMPLATE = "templates/bootstrap.json"
Expand Down Expand Up @@ -109,9 +105,7 @@ def __init__(

def next_event(self, timeout: int) -> ChildProcessEvent:
event: ChildProcessEvent = self.queue.get(timeout=timeout)
source = event.source
message = event.data
self.outputs[source].append(message)
self.outputs[event.source].append(event.data)
return event

def expect_output(
Expand Down Expand Up @@ -151,8 +145,8 @@ def expect_output(
return True
return False

def on_message(self, source: str, message: str):
self.queue.put(ChildProcessEvent(source, message))
def on_message(self, source: str, msg: str):
self.queue.put(ChildProcessEvent(source, msg))


def _Sanitize(l: str) -> str:
Expand Down Expand Up @@ -228,9 +222,9 @@ def log_reader_loop(self):
s = str(prefix + log.decode("utf-8"))
prefix = "" if s[-1] == "\n" else s[s.rfind("\n") :]
for l in s[: s.rfind("\n")].splitlines():
message = _Sanitize(l)
logger.info("[%s] %s", self.name, message)
self.manager.on_message(self.name, message)
sanitized = _Sanitize(l)
logger.info("[%s] %s", self.name, sanitized)
self.manager.on_message(self.name, sanitized)


class GrpcProcess:
Expand Down Expand Up @@ -280,7 +274,6 @@ def channel(self) -> grpc.Channel:


class ControlPlane(GrpcProcess):

def __init__(
self,
manager: ProcessManager,
Expand Down Expand Up @@ -308,7 +301,7 @@ def __enter__(self):
return None
self.update_resources(
listener_name=self.listener_name,
cluster="main_cluster",
cluster_name="main_cluster",
upstream_host=self.upstream_host,
upstream_port=self.upstream_port,
)
Expand All @@ -331,7 +324,7 @@ def update_resources(
upstream_host="localhost",
):
listener = framework.helpers.xds_resources.build_listener(
listener_name=listener_name, cluster_name=cluster
listener_name=listener_name, cluster_name=cluster_name
)
cluster = framework.helpers.xds_resources.build_cluster(
cluster_name=cluster_name,
Expand All @@ -348,15 +341,6 @@ def update_resources(
)
)

def expect_running(self, timeout_s: int = 5):
return self.expect_message_in_output(
"Management server listening on", timeout_s
)

def set_resources(self, resources):
print([r.DebugString() for r in resources].join("\n\n"))
raise Exception("Stop!")


class Client(GrpcProcess):
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion framework/helpers/xds_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from envoy.config.core.v3 import address_pb2
from envoy.config.cluster.v3 import cluster_pb2
from envoy.config.core.v3 import address_pb2
from envoy.config.endpoint.v3 import endpoint_components_pb2
from envoy.config.endpoint.v3 import endpoint_pb2
from envoy.config.listener.v3 import api_listener_pb2
Expand Down
42 changes: 9 additions & 33 deletions tests/fallback_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import absl
from absl import flags
from absl.testing import absltest
from google.protobuf.json_format import MessageToJson

import framework
import framework.helpers.docker
Expand All @@ -41,6 +40,8 @@
)
_NODE_ID = flags.DEFINE_string("node", "test-id", "Node ID")

_LISTENER = "listener_0"

absl.flags.adopt_module_key_flags(framework.xds_k8s_testcase)


Expand Down Expand Up @@ -74,17 +75,19 @@ def start_client(self, port: int = None, name: str = None):
manager=self.process_manager,
name=name or framework.xds_flags.CLIENT_NAME.value,
port=port or get_free_port(),
url="xds:///listener_0",
url=f"xds:///{_LISTENER}",
image=framework.xds_k8s_flags.CLIENT_IMAGE.value,
)

def start_control_plane(self, name: str, index: int, upstream_port: int):
logger.debug('Starting control plane "%s"', name)
return framework.helpers.docker.ControlPlane(
self.process_manager,
listener_name=_LISTENER,
name=name,
port=self.bootstrap.xds_config_server_port(index),
upstream=f"{_HOST_NAME.value}:{upstream_port}",
upstream_host=_HOST_NAME.value,
upstream_port=upstream_port,
image=_CONTROL_PLANE_IMAGE.value,
)

Expand All @@ -100,29 +103,6 @@ def start_server(self, name: str, port: int = None):
command=[],
)

def test_dummy_to_see(self):
print(
MessageToJson(
framework.helpers.docker.make_resource_to_set_message(
framework.helpers.xds_resources.build_cluster(
cluster_name="cluster_cluster",
upstream_host="hostname",
upstream_port=1111,
)
)
)
)
print(
MessageToJson(
framework.helpers.docker.make_resource_to_set_message(
framework.helpers.xds_resources.build_listener(
listener_name="listener0",
cluster_name="cluster_cluster",
)
)
)
)

@unittest.skip("Ignore for now")
def test_fallback_on_startup(self):
with (
Expand Down Expand Up @@ -178,9 +158,6 @@ def test_fallback_mid_startup(self):
) as primary,
self.start_control_plane("fallback_xds_config", 1, server2.port),
):
# Wait for control plane to start up, stop when the client asks for
# a cluster from the primary server
self.assertTrue(primary.expect_running())
primary.stop_on_resource_request(
"type.googleapis.com/envoy.config.cluster.v3.Cluster",
"example_proxy_cluster",
Expand All @@ -199,7 +176,6 @@ def test_fallback_mid_startup(self):
with self.start_control_plane(
"primary_xds_config_run_2", 0, server1.port
):
self.assertTrue(primary.expect_running())
stats = client.get_stats(10)
self.assertEqual(stats.num_failures, 0)
self.assertIn("server1", stats.rpcs_by_peer)
Expand Down Expand Up @@ -228,7 +204,8 @@ def test_fallback_mid_update(self):
"test_cluster_2",
)
primary.update_resources(
cluster="test_cluster_2",
cluster_name="test_cluster_2",
listener_name=_LISTENER,
upstream_port=server3.port,
upstream_host=_HOST_NAME.value,
)
Expand All @@ -240,8 +217,7 @@ def test_fallback_mid_update(self):
name="primary_xds_config_run_2",
index=0,
upstream_port=server3.port,
) as primary2:
self.assertTrue(primary2.expect_running())
):
stats = client.get_stats(20)
self.assertEqual(stats.num_failures, 0)
self.assertIn("server3", stats.rpcs_by_peer)
Expand Down

0 comments on commit abf4c4a

Please sign in to comment.