-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: retry and retry_async support streaming rpcs #495
Changes from 78 commits
953106a
89aeb75
27feb80
0dffa6d
b330c3b
67ceaa2
ee2647a
5a5396c
7afa76b
2d91ade
0cd384e
f72bbec
88eed5c
91f9cc4
c3eb997
f6c6201
57b0ee3
e814ce7
0ffb03f
a8024f3
c76f641
ee631e3
70eb78c
42ee132
102d83b
f029dbd
c83c62a
185826c
c5f7bbe
4242036
9c4799c
0bd6cab
67aeeaf
985b13a
0ea8297
b952652
6cb3e2d
99da116
7f862d0
04a4a69
d20cf08
183c221
d2217e4
d4a9d30
06d45cc
de41a14
dcb3766
dd368e4
452b9bb
6879418
847509f
b5e3796
7a7d9ac
6619895
27fc930
d6a23ea
90ef834
6201db6
61ce3a7
69149a1
d63871e
773e033
d1def5d
cbaaa1d
21a863f
878ddfb
7b0a600
0188228
902a4ab
74f3f3e
e506aad
5baa2aa
5c3805d
265d998
0423ebe
c4049f5
acd6546
b1ad4b3
8dcf67c
6104c59
43d0913
9ba7676
14c195c
de7b51a
4cdee6b
a526d65
ee2bbdd
5f82355
9900c40
2c2dcbe
3340399
de07714
67068ac
54325bc
bafa18b
2ae2a32
9cadd63
c9ef1d5
41c7868
30fccb9
a2b0e6c
4aa1ab4
8349424
ece5cf8
5ddda24
9e3ea92
3b06b3a
8bb6b0c
37c64a0
cee0028
3a7e5fa
ba6dc9f
0500b8b
1ccadb1
c312262
1fe57e0
4f09f29
06824b9
343157b
93f82cc
0915ca0
61e5ab5
51c125b
02604bc
6269db2
0dcd0de
54e9c81
2342910
eada0d7
ae2bf37
c8a4f26
2840b9f
82274a3
1594a17
9b0ddb0
8985127
60b20ab
237ca3d
a46c0f7
93727b7
796ae52
0688ffe
da048ab
80e5eb0
562079b
a0fecc5
8cc6ea9
e7a5cd4
02c12cc
03b1608
b05b11f
0b5d3a2
03f2af5
5fee888
239ed7d
94eb0f5
7d1e246
b0faa2d
6c44298
51df672
e207376
39716a7
2bbf33f
3b03bfa
e63701d
c101ea6
3642d74
34cfa08
583181d
b311b87
19a998d
5637e88
c4be5f2
4d9e762
2e9e84b
d183a7e
e2d9c9c
4543106
d791aad
638cc68
f7b1e14
07db4c2
d448a52
781426a
4a05404
b221c8d
b5b4534
0f1145d
8408512
aa69c56
d1ac29d
3ab88fc
382d0e2
4258823
1bc9731
aafe057
8095229
de9f518
7864667
4c24322
7855513
f4bfb02
a88cf6f
b5c62e1
852f4f8
cd8323e
ace61eb
1bbd1f0
35cc00a
89abfa4
74ab817
85b3e02
6dbe17d
71e5888
cbae3d3
61198b8
acf9752
7cf9fbf
f62439a
b7abeca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ def check_if_exists(): | |
|
||
from google.api_core import datetime_helpers | ||
from google.api_core import exceptions | ||
from google.api_core.retry_streaming import RetryableGenerator | ||
from google.auth import exceptions as auth_exceptions | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -154,7 +155,7 @@ def retry_target( | |
higher-level retry helper :class:`Retry`. | ||
|
||
Args: | ||
target(Callable): The function to call and retry. This must be a | ||
target(Callable[None, Any]): The function to call and retry. This must be a | ||
nullary function - apply arguments with `functools.partial`. | ||
predicate (Callable[Exception]): A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
|
@@ -301,6 +302,15 @@ class Retry(object): | |
maximum (float): The maximum amount of time to delay in seconds. | ||
multiplier (float): The multiplier applied to the delay. | ||
timeout (float): How long to keep retrying, in seconds. | ||
on_error (Callable[Exception]): A function to call while processing | ||
vchudnov-g marked this conversation as resolved.
Show resolved
Hide resolved
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Should they be the same in both places? And it doesn't seem that the type hints in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah they probably should, I think I was just avoiding changing too much of the existing comments If we're going to make a change, maybe we should remove the types from the comments entirely now that we have type annotations, so we don't have to duplicate it in multiple places? |
||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. a Generator, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
or function that returns an Iterable). If True, the iterable | ||
will be wrapped with retry logic, and any failed outputs will | ||
restart the stream. If False, only the input function call itself | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restart the stream from the last non-failed output, right ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately that's not really something we can do in a general way. We can create a new generator and start yielding from it again, but we have no guarantee that it will yield the same sequence as last time. (For example, in Bigtable we will modify the request after a failure to avoid requesting repeat data from the server) I considered adding a "filter" on top of the generator, and providing a default filter that raises an exception if a retry deviates from the initial stream, which would be one way to do what you requested. But in the end, I decided that it's probably better to keep this code simple and just pass on the generator values directly, and the caller can do their own filtering over top of the retry-stream There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this functionality is not intended for external users directly, but rather for the library generator and handwritten library code, this seems fine. But the generator and library developers need to be aware of this and design their Please expand the docstring of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, I added a much more documentation. Let me know that looks better So far this will mostly just be useful for hand-written layers. If we want to add this to gapic libraries automatically, we could try to classify the different kinds of streams our libraries return and provide a couple different presets for different kinds of streams. But so far that's been out of scope. |
||
will be retried. Defaults to False. | ||
deadline (float): DEPRECATED: use `timeout` instead. For backward | ||
compatibility, if specified it will override the ``timeout`` parameter. | ||
""" | ||
|
@@ -313,7 +323,8 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_DEADLINE, | ||
on_error=None, | ||
**kwargs | ||
is_stream=False, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
**kwargs, | ||
): | ||
self._predicate = predicate | ||
self._initial = initial | ||
|
@@ -322,6 +333,7 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
@@ -346,7 +358,8 @@ def retry_wrapped_func(*args, **kwargs): | |
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return retry_target( | ||
retry_func = RetryableGenerator if self._is_stream else retry_target | ||
return retry_func( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,7 @@ async def check_if_exists(): | |
from google.api_core.retry import exponential_sleep_generator | ||
from google.api_core.retry import if_exception_type # noqa: F401 | ||
from google.api_core.retry import if_transient_error | ||
from google.api_core.retry_streaming_async import AsyncRetryableGenerator | ||
|
||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -74,7 +75,7 @@ async def check_if_exists(): | |
async def retry_target( | ||
target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the streaming versions have type declarations. Do you want to have type declarations in the non-stream versions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I was trying to avoid scope growth, but it probably makes sense to add at this point. Done |
||
): | ||
"""Call a function and retry if it fails. | ||
"""Await a coroutine and retry if it fails. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the sleep-delay logic should be the same between all four versions of this function: syncvs async, unary vs streaming. I would suggest:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to avoid making changes to the existing retries, but I suppose with the new base classes, it makes sense to generalize them a bit. I added a shared Note though that I had to make a (potentially breaking) change to the async unary retry logic to make it consistent though. Previously async_unary would attempt to cancel early when the deadline is reached, while the other retries only check the deadline after the request terminates. The old behavior was very slow (using asyncio.wait_for), and caused race conditions. I'd consier it a bug, but fixing the behavior may count as a breaking change (even though gapic functions shouldn't be affected) |
||
|
||
This is the lowest-level retry helper. Generally, you'll use the | ||
higher-level retry helper :class:`Retry`. | ||
|
@@ -156,7 +157,7 @@ async def retry_target( | |
|
||
|
||
class AsyncRetry: | ||
"""Exponential retry decorator for async functions. | ||
"""Exponential retry decorator for async coroutines. | ||
|
||
This class is a decorator used to add exponential back-off retry behavior | ||
to an RPC call. | ||
|
@@ -172,9 +173,17 @@ class AsyncRetry: | |
maximum (float): The maximum amount of time to delay in seconds. | ||
multiplier (float): The multiplier applied to the delay. | ||
timeout (float): How long to keep retrying in seconds. | ||
When ``is_stream``, only time spent waiting on the | ||
target or sleeping between retries is counted towards the timeout. | ||
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. an AsyncGenerator, | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
or function or coroutine that returns an AsyncIterable). | ||
If True, the iterable will be wrapped with retry logic, and any | ||
failed outputs will restart the stream. If False, only the input | ||
function call itself will be retried. Defaults to False. | ||
deadline (float): DEPRECATED use ``timeout`` instead. If set it will | ||
override ``timeout`` parameter. | ||
""" | ||
|
@@ -187,6 +196,7 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_TIMEOUT, | ||
on_error=None, | ||
is_stream=False, | ||
**kwargs | ||
): | ||
self._predicate = predicate | ||
|
@@ -196,12 +206,13 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
||
Args: | ||
func (Callable): The callable to add retry behavior to. | ||
func (Callable): The callable or stream to add retry behavior to. | ||
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
|
@@ -224,11 +235,29 @@ async def retry_wrapped_func(*args, **kwargs): | |
target, | ||
self._predicate, | ||
sleep_generator, | ||
self._timeout, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
@functools.wraps(func) | ||
def retry_wrapped_stream(*args, **kwargs): | ||
"""A wrapper that iterates over target stream with retry.""" | ||
target = functools.partial(func, *args, **kwargs) | ||
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return AsyncRetryableGenerator( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
return retry_wrapped_func | ||
if self._is_stream: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't use a function pointer here, like you did in the sync case ( NB: I notice that you're making changes to this PR, so I can't find the code that I quoted in the current version, though it is in the PR that I cloned locally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only reason is because the existing But I just made a change to make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the new structure |
||
return retry_wrapped_stream | ||
else: | ||
return retry_wrapped_func | ||
|
||
def _replace( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
# Copyright 2023 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Helpers for retries for streaming APIs.""" | ||
|
||
from typing import Callable, Optional, Iterable, Iterator, Generator, TypeVar, Any, cast | ||
|
||
import datetime | ||
import logging | ||
import time | ||
|
||
from google.api_core import datetime_helpers | ||
from google.api_core import exceptions | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
class RetryableGenerator(Generator[T, Any, None]): | ||
""" | ||
Helper class for retrying Iterator and Generator-based | ||
streaming APIs. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
target: Callable[[], Iterable[T]], | ||
predicate: Callable[[Exception], bool], | ||
sleep_generator: Iterable[float], | ||
timeout: Optional[float] = None, | ||
on_error: Optional[Callable[[Exception], None]] = None, | ||
): | ||
""" | ||
Args: | ||
target: The function to call to produce iterables for each retry. | ||
This must be a nullary function - apply arguments with | ||
`functools.partial`. | ||
predicate: A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
It should return True to retry or False otherwise. | ||
sleep_generator: An infinite iterator that determines | ||
how long to sleep between retries. | ||
timeout: How long to keep retrying the target, in seconds. | ||
on_error: A function to call while processing a | ||
retryable exception. Any error raised by this function will *not* | ||
be caught. | ||
""" | ||
self.target_fn = target | ||
self.active_target: Iterator[T] = self.target_fn().__iter__() | ||
self.predicate = predicate | ||
self.sleep_generator = iter(sleep_generator) | ||
self.on_error = on_error | ||
self.timeout = timeout | ||
if self.timeout is not None: | ||
self.deadline = datetime_helpers.utcnow() + datetime.timedelta( | ||
seconds=self.timeout | ||
) | ||
else: | ||
self.deadline = None | ||
|
||
def __iter__(self) -> Generator[T, Any, None]: | ||
""" | ||
Implement the iterator protocol. | ||
""" | ||
return self | ||
|
||
def _handle_exception(self, exc) -> None: | ||
""" | ||
When an exception is raised while iterating over the active_target, | ||
check if it is retryable. If so, create a new active_target and | ||
continue iterating. If not, raise the exception. | ||
""" | ||
if not self.predicate(exc): | ||
raise exc | ||
else: | ||
# run on_error callback if provided | ||
if self.on_error: | ||
self.on_error(exc) | ||
try: | ||
next_sleep = next(self.sleep_generator) | ||
except StopIteration: | ||
raise ValueError("Sleep generator stopped yielding sleep values") | ||
# if deadline is exceeded, raise RetryError | ||
if self.deadline is not None: | ||
next_attempt = datetime_helpers.utcnow() + datetime.timedelta( | ||
seconds=next_sleep | ||
) | ||
self._check_timeout(next_attempt, exc) | ||
# sleep before retrying | ||
_LOGGER.debug( | ||
"Retrying due to {}, sleeping {:.1f}s ...".format(exc, next_sleep) | ||
) | ||
time.sleep(next_sleep) | ||
self.active_target = self.target_fn().__iter__() | ||
|
||
def _check_timeout( | ||
self, current_time: float, source_exception: Optional[Exception] = None | ||
) -> None: | ||
""" | ||
Helper function to check if the timeout has been exceeded, and raise a RetryError if so. | ||
|
||
Args: | ||
- current_time: the timestamp to check against the deadline | ||
- source_exception: the exception that triggered the timeout check, if any | ||
Raises: | ||
- RetryError if the deadline has been exceeded | ||
""" | ||
if ( | ||
self.deadline is not None | ||
and self.timeout is not None | ||
and self.deadline < current_time | ||
): | ||
raise exceptions.RetryError( | ||
"Timeout of {:.1f}s exceeded".format(self.timeout), | ||
source_exception, | ||
) from source_exception | ||
|
||
def __next__(self) -> T: | ||
""" | ||
Implement the iterator protocol. | ||
|
||
Returns: | ||
- the next value of the active_target iterator | ||
""" | ||
# check for expired timeouts before attempting to iterate | ||
self._check_timeout(datetime_helpers.utcnow()) | ||
try: | ||
return next(self.active_target) | ||
except Exception as exc: | ||
daniel-sanche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._handle_exception(exc) | ||
# if retryable exception was handled, try again with new active_target | ||
return self.__next__() | ||
|
||
def close(self) -> None: | ||
""" | ||
Close the active_target if supported. (e.g. target is a generator) | ||
|
||
Raises: | ||
- AttributeError if the active_target does not have a close() method | ||
""" | ||
if getattr(self.active_target, "close", None): | ||
casted_target = cast(Generator, self.active_target) | ||
return casted_target.close() | ||
else: | ||
raise AttributeError( | ||
"close() not implemented for {}".format(self.active_target) | ||
) | ||
|
||
def send(self, *args, **kwargs) -> T: | ||
""" | ||
Call send on the active_target if supported. (e.g. target is a generator) | ||
|
||
If an exception is raised, a retry may be attempted before returning | ||
a result. | ||
|
||
Args: | ||
- *args: arguments to pass to the wrapped generator's send method | ||
- **kwargs: keyword arguments to pass to the wrapped generator's send method | ||
Returns: | ||
- the next value of the active_target iterator after calling send | ||
Raises: | ||
- AttributeError if the active_target does not have a send() method | ||
""" | ||
# check for expired timeouts before attempting to iterate | ||
self._check_timeout(datetime_helpers.utcnow()) | ||
if getattr(self.active_target, "send", None): | ||
casted_target = cast(Generator, self.active_target) | ||
try: | ||
return casted_target.send(*args, **kwargs) | ||
except Exception as exc: | ||
self._handle_exception(exc) | ||
# if exception was retryable, use new target for return value | ||
return self.__next__() | ||
else: | ||
raise AttributeError( | ||
"send() not implemented for {}".format(self.active_target) | ||
) | ||
|
||
def throw(self, *args, **kwargs) -> T: | ||
""" | ||
Call throw on the active_target if supported. (e.g. target is a generator) | ||
|
||
If an exception is raised, a retry may be attempted before returning | ||
a result. | ||
|
||
Args: | ||
- *args: arguments to pass to the wrapped generator's throw method | ||
- **kwargs: keyword arguments to pass to the wrapped generator's throw method | ||
Returns: | ||
- the next vale of the active_target iterator after calling throw | ||
Raises: | ||
- AttributeError if the active_target does not have a throw() method | ||
""" | ||
if getattr(self.active_target, "throw", None): | ||
casted_target = cast(Generator, self.active_target) | ||
try: | ||
return casted_target.throw(*args, **kwargs) | ||
except Exception as exc: | ||
self._handle_exception(exc) | ||
# if retryable exception was handled, return next from new active_target | ||
return self.__next__() | ||
else: | ||
raise AttributeError( | ||
"throw() not implemented for {}".format(self.active_target) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these correspond do ? It's a little weird to have None as an arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this was supposed to be an empty list of arguments. I'll fix it