Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tests): add streaming retry showcase tests #1764

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def showcase_library(
f"google/showcase/v1beta1/echo.proto",
f"google/showcase/v1beta1/identity.proto",
f"google/showcase/v1beta1/messaging.proto",
f"google/showcase/v1beta1/sequence.proto",
)
session.run(
*cmd_tup, external=True,
Expand Down
11 changes: 11 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import google.auth
from google.auth import credentials as ga_credentials
from google.showcase import EchoClient
from google.showcase import SequenceServiceClient
from google.showcase import IdentityClient
from google.showcase import MessagingClient

Expand All @@ -30,6 +31,7 @@
import asyncio
from google.showcase import EchoAsyncClient
from google.showcase import IdentityAsyncClient
from google.showcase import SequenceServiceAsyncClient

_test_event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_test_event_loop)
Expand Down Expand Up @@ -61,6 +63,15 @@ def async_identity(use_mtls, event_loop):
channel_creator=aio.insecure_channel
)

@pytest.fixture
def async_sequence(use_mtls, event_loop):
return construct_client(
SequenceServiceAsyncClient,
use_mtls,
transport_name="grpc_asyncio",
channel_creator=aio.insecure_channel
)


dir = os.path.dirname(__file__)
with open(os.path.join(dir, "../cert/mtls.crt"), "rb") as fh:
Expand Down
314 changes: 314 additions & 0 deletions tests/system/test_retry_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from unittest import mock
from google.rpc.status_pb2 import Status
from datetime import timedelta
from google.api_core import retry as retries
from google.api_core import exceptions as core_exceptions


def _code_from_exc(exc):
"""
return the grpc code from an exception
"""
return exc.grpc_status_code.value[0]


def test_streaming_retry_success(sequence):
"""
Test a stream with a sigle success response
"""
retry = retries.Retry(
predicate=retries.if_exception_type(), is_stream=True)
content = ["hello", "world"]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
# single response with entire stream content
"responses": [{"status": Status(code=0), "response_index": len(content)}],
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == content
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 1
assert report.attempts[0].status == Status(code=0)


def test_streaming_non_retryable_error(sequence):
"""
Test a retryable stream failing with non-retryable error
"""
retry = retries.Retry(
predicate=retries.if_exception_type(), is_stream=True)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="expected error",
)
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": [{"status": error, "response_index": 0}],
}
)
with pytest.raises(core_exceptions.ServiceUnavailable):
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
next(it)
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 1
assert report.attempts[0].status == error


def test_streaming_transient_retryable(sequence):
"""
Server returns a retryable error a number of times before success.
Retryable errors should not be presented to the end user.
"""
retry = retries.Retry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
timeout=1,
is_stream=True,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
responses = [{"status": error, "response_index": 0} for _ in range(3)] + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == content
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 4
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error
assert report.attempts[3].status == Status(code=0)


def test_streaming_transient_retryable_partial_data(sequence):
"""
Server stream yields some data before failing with a retryable error a number of times before success.
Wrapped stream should contain data from all attempts
"""
from google.protobuf.duration_pb2 import Duration
retry = retries.Retry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
is_stream=True,
)
content = ["hello", ",", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [{"status": error, "response_index": 3, "delay":Duration(seconds=30)}] * 3

responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
results = [pb.content for pb in it]
assert results == ["hello", "hello", "hello", "hello", "world"]
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 4
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error
assert report.attempts[3].status == Status(code=0)


def test_streaming_retryable_eventual_timeout(sequence):
"""
Server returns a retryable error a number of times before reaching timeout.
Should raise a retry error.
"""
retry = retries.Retry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable),
initial=0,
maximum=0,
timeout=0.35,
is_stream=True,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [
{"status": error, "response_index": 1,
"delay": timedelta(seconds=0.15)}
] * 10
responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with pytest.raises(core_exceptions.RetryError) as exc_info:
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
[pb.content for pb in it]
cause = exc_info.value.__cause__
assert isinstance(cause, core_exceptions.ServiceUnavailable)
# verify streaming report
report = sequence.get_streaming_sequence_report(
name=f"{seq.name}/streamingSequenceReport"
)
assert len(report.attempts) == 3
assert report.attempts[0].status == error
assert report.attempts[1].status == error
assert report.attempts[2].status == error


def test_streaming_retry_on_error(sequence):
"""
on_error should be called for all retryable errors as they are encountered
"""
encountered_excs = []

def on_error(exc):
encountered_excs.append(exc)

retry = retries.Retry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable, core_exceptions.GatewayTimeout
),
initial=0,
maximum=0,
on_error=on_error,
is_stream=True,
)
content = ["hello", "world"]
errors = [
core_exceptions.ServiceUnavailable,
core_exceptions.DeadlineExceeded,
core_exceptions.NotFound,
]
responses = [{"status": Status(code=_code_from_exc(exc))}
for exc in errors]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with pytest.raises(core_exceptions.NotFound):
it = sequence.attempt_streaming_sequence(name=seq.name, retry=retry)
[pb.content for pb in it]
# on_error should have been called on the first two errors, but not the terminal one
assert len(encountered_excs) == 2
assert isinstance(encountered_excs[0], core_exceptions.ServiceUnavailable)
# rest raises superclass GatewayTimeout in place of DeadlineExceeded
assert isinstance(
encountered_excs[1],
(core_exceptions.DeadlineExceeded, core_exceptions.GatewayTimeout),
)


@pytest.mark.parametrize(
"initial,multiplier,maximum,expected",
[
(0.1, 1.0, 0.5, [0.1, 0.1, 0.1]),
(0, 2.0, 0.5, [0, 0]),
(0.1, 2.0, 0.5, [0.1, 0.2, 0.4, 0.5, 0.5]),
(1, 1.5, 5, [1, 1.5, 2.25, 3.375, 5, 5]),
],
)
def test_streaming_retry_sleep_generator(
sequence, initial, multiplier, maximum, expected
):
"""
should be able to pass in sleep generator to control backoff
"""
retry = retries.Retry(
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable),
initial=initial,
maximum=maximum,
multiplier=multiplier,
is_stream=True,
)
content = ["hello", "world"]
error = Status(
code=_code_from_exc(core_exceptions.ServiceUnavailable),
message="transient error",
)
transient_error_list = [{"status": error}] * len(expected)
responses = transient_error_list + [
{"status": Status(code=0), "response_index": len(content)}
]
seq = sequence.create_streaming_sequence(
streaming_sequence={
"name": __name__,
"content": " ".join(content),
"responses": responses,
}
)
with mock.patch("random.uniform") as mock_uniform:
# make sleep generator deterministic
mock_uniform.side_effect = lambda a, b: b
with mock.patch("time.sleep") as mock_sleep:
it = sequence.attempt_streaming_sequence(
name=seq.name, retry=retry)
[pb.content for pb in it]
assert mock_sleep.call_count == len(expected)
# ensure that sleep times match expected
assert mock_sleep.call_args_list == [
mock.call(sleep_time) for sleep_time in expected
]
Loading
Loading