Skip to content

Commit

Permalink
Clock.py types. (#1244)
Browse files Browse the repository at this point in the history
* Start typing time.py

* Testing out Enum wrapper for ClockType

* convert to rcl_clock_type_t

* Update create_time_point

* add types to logging_service

* Add types to duration.py

* Add newlines for class definintions

* update type alias name

* Update to use Protocols

* Add types to time.py

* Add types

* Fix import order

* Started typing clock.py

* Move typealias import

Signed-off-by: Michael Carlstrom <[email protected]>
Signed-off-by: Michael Carlstrom <[email protected]>
  • Loading branch information
InvincibleRMC authored Apr 10, 2024
1 parent c8d3481 commit 7639215
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
105 changes: 81 additions & 24 deletions rclpy/rclpy/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
# limitations under the License.

from enum import IntEnum
from typing import Optional
from types import TracebackType
from typing import Callable, Optional, Protocol, Type, TYPE_CHECKING, TypedDict

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy

from .clock_type import ClockType
from .context import Context
from .destroyable import DestroyableType
from .duration import Duration
from .exceptions import NotInitializedException
from .time import Time
from .time import Time, TimeHandle
from .utilities import get_default_context


if TYPE_CHECKING:
from typing import TypeAlias


class ClockChange(IntEnum):
ROS_TIME_NO_CHANGE = _rclpy.ClockChange.ROS_TIME_NO_CHANGE
ROS_TIME_ACTIVATED = _rclpy.ClockChange.ROS_TIME_ACTIVATED
Expand All @@ -34,7 +40,8 @@ class ClockChange(IntEnum):

class JumpThreshold:

def __init__(self, *, min_forward: Duration, min_backward: Duration, on_clock_change=True):
def __init__(self, *, min_forward: Optional[Duration], min_backward: Optional[Duration],
on_clock_change: bool = True):
"""
Initialize an instance of JumpThreshold.
Expand Down Expand Up @@ -64,7 +71,7 @@ def __init__(self, *, min_forward: Duration, min_backward: Duration, on_clock_ch

class TimeJump:

def __init__(self, clock_change: ClockChange, delta):
def __init__(self, clock_change: ClockChange, delta: Duration):
if not isinstance(clock_change, (ClockChange, _rclpy.ClockChange)):
raise TypeError('clock_change must be an instance of rclpy.clock.ClockChange')
self._clock_change = clock_change
Expand All @@ -79,9 +86,19 @@ def delta(self) -> Duration:
return self._delta


class TimeJumpDictionary(TypedDict):
clock_change: ClockChange
delta: int


JumpHandlePreCallbackType: 'TypeAlias' = Callable[[], None]


class JumpHandle:

def __init__(self, *, clock, threshold: JumpThreshold, pre_callback, post_callback):
def __init__(self, *, clock: 'Clock', threshold: JumpThreshold,
pre_callback: Optional[JumpHandlePreCallbackType],
post_callback: Optional[Callable[[TimeJumpDictionary], None]]) -> None:
"""
Register a clock jump callback.
Expand All @@ -98,7 +115,7 @@ def __init__(self, *, clock, threshold: JumpThreshold, pre_callback, post_callba
raise ValueError('pre_callback must be callable if given')
if post_callback is not None and not callable(post_callback):
raise ValueError('post_callback must be callable if given')
self._clock = clock
self._clock: Optional[Clock] = clock
self._pre_callback = pre_callback
self._post_callback = post_callback

Expand All @@ -113,27 +130,63 @@ def __init__(self, *, clock, threshold: JumpThreshold, pre_callback, post_callba
self._clock.handle.add_clock_callback(
self, threshold.on_clock_change, min_forward, min_backward)

def unregister(self):
def unregister(self) -> None:
"""Remove a jump callback from the clock."""
if self._clock is not None:
with self._clock.handle:
self._clock.handle.remove_clock_callback(self)
self._clock = None

def __enter__(self):
def __enter__(self) -> 'JumpHandle':
return self

def __exit__(self, t, v, tb):
def __exit__(self, t: Optional[Type[BaseException]],
v: Optional[BaseException],
tb: Optional[TracebackType]) -> None:
self.unregister()


class ClockHandle(DestroyableType, Protocol):
"""Generic alias of _rclpy.Clock."""

def get_now(self) -> TimeHandle:
"""Value of the clock."""
...

def get_ros_time_override_is_enabled(self) -> bool:
"""Return if a clock using ROS time has the ROS time override enabled."""
...

def set_ros_time_override_is_enabled(self, enabled: bool) -> None:
"""Set if a clock using ROS time has the ROS time override enabled."""
...

def set_ros_time_override(self, time_point: TimeHandle) -> None:
"""Set the ROS time override for a clock using ROS time."""
...

def add_clock_callback(self, pyjump_handle: JumpHandle,
on_clock_change: bool, min_forward: int, min_backward: int) -> None:
"""Add a time jump callback to a clock."""
...

def remove_clock_callback(self, pyjump_handle: JumpHandle) -> None:
"""Remove a time jump callback from a clock."""
...


class Clock:

def __new__(cls, *, clock_type: ClockType = ClockType.SYSTEM_TIME):
if TYPE_CHECKING:
__clock: ClockHandle
_clock_type: ClockType

def __new__(cls, *,
clock_type: ClockType = ClockType.SYSTEM_TIME) -> 'Clock':
if not isinstance(clock_type, (ClockType, _rclpy.ClockType)):
raise TypeError('Clock type must be a ClockType enum')
if clock_type is ClockType.ROS_TIME:
self = super().__new__(ROSClock)
self: 'Clock' = super().__new__(ROSClock)
else:
self = super().__new__(cls)
self.__clock = _rclpy.Clock(clock_type)
Expand All @@ -145,15 +198,15 @@ def clock_type(self) -> ClockType:
return self._clock_type

@property
def handle(self):
def handle(self) -> ClockHandle:
"""
Return the internal instance of ``rclpy::Clock``.
This should not be used outside of ``rclpy``.
"""
return self.__clock

def __repr__(self):
def __repr__(self) -> str:
return 'Clock(clock_type={0})'.format(self.clock_type.name)

def now(self) -> Time:
Expand All @@ -164,7 +217,8 @@ def now(self) -> Time:

def create_jump_callback(
self, threshold: JumpThreshold, *,
pre_callback=None, post_callback=None) -> JumpHandle:
pre_callback: Optional[JumpHandlePreCallbackType] = None,
post_callback: Optional[Callable[[TimeJump], None]] = None) -> JumpHandle:
"""
Create callback handler for clock time jumps.
Expand All @@ -177,22 +231,23 @@ def create_jump_callback(
must take no arguments.
:param post_callback: Callback to be called after new time is set. The callback
must accept a single argument that is a ``TimeJump``.
:rtype: :class:`rclpy.clock.JumpHandler`
"""
if post_callback is not None and callable(post_callback):
original_callback = post_callback

def callback_shim(jump_dict):
def callback_shim(jump_dict: TimeJumpDictionary) -> None:
nonlocal original_callback
clock_change = jump_dict['clock_change']
duration = Duration(nanoseconds=jump_dict['delta'])
original_callback(TimeJump(clock_change, duration))

post_callback = callback_shim
post_callback_time_jump_dictionary = callback_shim
else:
post_callback_time_jump_dictionary = None

return JumpHandle(
clock=self, threshold=threshold, pre_callback=pre_callback,
post_callback=post_callback)
post_callback=post_callback_time_jump_dictionary)

def sleep_until(self, until: Time, context: Optional[Context] = None) -> bool:
"""
Expand Down Expand Up @@ -222,7 +277,7 @@ def sleep_until(self, until: Time, context: Optional[Context] = None) -> bool:
event = _rclpy.ClockEvent()
time_source_changed = False

def on_time_jump(time_jump: TimeJump):
def on_time_jump(time_jump: TimeJump) -> None:
"""Wake when time jumps and is past target time."""
nonlocal time_source_changed

Expand Down Expand Up @@ -256,7 +311,7 @@ def on_time_jump(time_jump: TimeJump):

return self.now() >= until

def sleep_for(self, rel_time: Duration, context=None) -> bool:
def sleep_for(self, rel_time: Duration, context: Optional[Context] = None) -> bool:
"""
Sleep for a specified duration.
Expand Down Expand Up @@ -284,21 +339,23 @@ def sleep_for(self, rel_time: Duration, context=None) -> bool:

class ROSClock(Clock):

def __new__(cls):
return super().__new__(Clock, clock_type=ClockType.ROS_TIME)
def __new__(cls) -> 'ROSClock':
self = super().__new__(Clock, clock_type=ClockType.ROS_TIME)
assert isinstance(self, ROSClock)
return self

@property
def ros_time_is_active(self) -> bool:
"""Return ``True`` if ROS time is currently active."""
with self.handle:
return self.handle.get_ros_time_override_is_enabled()

def _set_ros_time_is_active(self, enabled):
def _set_ros_time_is_active(self, enabled: bool) -> None:
# This is not public because it is only to be called by a TimeSource managing the Clock
with self.handle:
self.handle.set_ros_time_override_is_enabled(enabled)

def set_ros_time_override(self, time: Time):
def set_ros_time_override(self, time: Time) -> None:
"""Set the next time the ROS clock will report when ROS time is active."""
if not isinstance(time, Time):
raise TypeError(
Expand Down
4 changes: 2 additions & 2 deletions rclpy/rclpy/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .clock_type import ClockType


class TimeType(Protocol):
class TimeHandle(Protocol):
"""Type alias of _rclpy.rcl_time_point_t."""

nanoseconds: int
Expand Down Expand Up @@ -56,7 +56,7 @@ def __init__(
# pybind11 would raise TypeError, but we want OverflowError
raise OverflowError(
'Total nanoseconds value is too large to store in C time point.')
self._time_handle: TimeType = _rclpy.rcl_time_point_t(total_nanoseconds, clock_type)
self._time_handle: TimeHandle = _rclpy.rcl_time_point_t(total_nanoseconds, clock_type)

@property
def nanoseconds(self) -> int:
Expand Down

0 comments on commit 7639215

Please sign in to comment.