Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local pathfinding PR 66 #280

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 100 additions & 48 deletions src/local_pathfinding/local_pathfinding/objectives.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Our custom OMPL optimization objectives."""

import itertools
import math
from enum import Enum, auto

Expand Down Expand Up @@ -63,19 +64,73 @@ class Objective(ob.StateCostIntegralObjective):
- This class inherits from the OMPL class StateCostIntegralObjective:
https://ompl.kavrakilab.org/classompl_1_1base_1_1StateCostIntegralObjective.html
- Camelcase is used for functions that override OMPL functions, as that is their convention.
- Also computes the maximum motion cost for normalization purposes.

Attributes:
space_information (StateSpacePtr): Contains all the information about
the space planning is done in.
max_motion_cost (float): The maximum motion cost between any two states in the state space.
"""

def __init__(self, space_information):
def __init__(self, space_information, num_samples: int):
super().__init__(si=space_information, enableMotionCostInterpolation=True)
self.space_information = space_information

states = self.sample_states(num_samples)
self.found_max_cost = False
# initialize to 1 so that motionCost is not normalized when finding the maximum motion cost
self.max_motion_cost = 1.0
self.max_motion_cost = self.find_maximum_motion_cost(states)
self.found_max_cost = True

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
raise NotImplementedError

def find_maximum_motion_cost(self, states: list[ob.SE2StateSpace]) -> float:
"""Finds the maximum motion cost between any two states in `states`.

Args:
states (list[ob.SE2StateSpace]): OMPL states.

Returns:
float: Maximum motion cost.
"""
return max(
self.motionCost(s1, s2).value()
for s1, s2 in itertools.combinations(iterable=states, r=2)
)

def sample_states(self, num_samples: int) -> list[ob.SE2StateSpace]:
"""Samples `num_samples` states from the state space.

Args:
num_samples (int): Number of states to sample.

Returns:
list[ob.SE2StateSpace]: OMPL states.
"""
sampler = self.space_information.getStateSpace().allocDefaultStateSampler()

sampled_states = []

for _ in range(num_samples):
state = self.space_information.getStateSpace().allocState()
sampler.sampleUniform(state)
sampled_states.append(state)

return sampled_states

def normalization(self, cost: float) -> float:
"""Normalizes cost using max_motion_cost and caps it at 1.

Args:
cost (float): motionCost value from an objective function.
Returns:
float: normalized cost between 0 to 1.
"""
normalized_cost = cost / self.max_motion_cost
return min(normalized_cost, 1.0) if self.found_max_cost else normalized_cost


class DistanceObjective(Objective):
"""Generates a distance objective function
Expand All @@ -94,13 +149,14 @@ def __init__(
method: DistanceMethod,
reference=HelperLatLon(latitude=0.0, longitude=0.0),
):
super().__init__(space_information)
self.method = method
if self.method == DistanceMethod.OMPL_PATH_LENGTH:
self.ompl_path_objective = ob.PathLengthOptimizationObjective(self.space_information)
self.ompl_path_objective = ob.PathLengthOptimizationObjective(space_information)
elif self.method == DistanceMethod.LATLON:
self.reference = reference

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the distance between two points

Expand All @@ -109,7 +165,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The distance between two points object
ob.Cost: The normalized distance between two points

Raises:
ValueError: If the distance method is not supported
Expand All @@ -118,17 +174,16 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2_xy = cs.XY(s2.getX(), s2.getY())
if self.method == DistanceMethod.EUCLIDEAN:
distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.LATLON:
distance = DistanceObjective.get_latlon_path_length_objective(
s1_xy, s2_xy, self.reference
)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.OMPL_PATH_LENGTH:
cost = self.ompl_path_objective.motionCost(s1_xy, s2_xy)
distance = self.ompl_path_objective.motionCost(s1, s2).value()
else:
ValueError(f"Method {self.method} not supported")
return cost
raise ValueError(f"Method {self.method} not supported")

return ob.Cost(self.normalization(distance))

@staticmethod
def get_euclidean_path_length_objective(s1: cs.XY, s2: cs.XY) -> float:
Expand Down Expand Up @@ -177,20 +232,17 @@ class MinimumTurningObjective(Objective):
"""

def __init__(
self,
space_information,
simple_setup,
heading_degrees: float,
method: MinimumTurningMethod,
self, space_information, simple_setup, heading_degrees: float, method: MinimumTurningMethod
):
super().__init__(space_information)
self.goal = cs.XY(
simple_setup.getGoal().getState().getX(), simple_setup.getGoal().getState().getY()
)
assert -180 < heading_degrees <= 180
self.heading = math.radians(heading_degrees)
self.method = method

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the turning cost between s1, s2, heading or the goal position

Expand All @@ -199,7 +251,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The minimum turning angle in degrees
ob.Cost: The normalized minimum turning angle in degrees

Raises:
ValueError: If the minimum turning method is not supported
Expand All @@ -213,8 +265,9 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
elif self.method == MinimumTurningMethod.HEADING_PATH:
angle = self.heading_path_turn_cost(s1_xy, s2_xy, self.heading)
else:
ValueError(f"Method {self.method} not supported")
return ob.Cost(angle)
raise ValueError(f"Method {self.method} not supported")

return ob.Cost(self.normalization(angle))

@staticmethod
def goal_heading_turn_cost(s1: cs.XY, goal: cs.XY, heading: float) -> float:
Expand Down Expand Up @@ -302,10 +355,11 @@ class WindObjective(Objective):
"""

def __init__(self, space_information, wind_direction_degrees: float):
super().__init__(space_information)
assert -180 < wind_direction_degrees <= 180
self.wind_direction = math.radians(wind_direction_degrees)

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the cost associated with the upwind and downwind directions of the boat in
relation to the wind.
Expand All @@ -315,11 +369,13 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The cost of going upwind or downwind
ob.Cost: The normalized cost of going upwind or downwind
"""
s1_xy = cs.XY(s1.getX(), s1.getY())
s2_xy = cs.XY(s2.getX(), s2.getY())
return ob.Cost(WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction))
wind_cost = WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction)

return ob.Cost(self.normalization(wind_cost))

@staticmethod
def wind_direction_cost(s1: cs.XY, s2: cs.XY, wind_direction: float) -> float:
Expand Down Expand Up @@ -425,7 +481,6 @@ def __init__(
wind_speed: float,
method: SpeedObjectiveMethod,
):
super().__init__(space_information)
assert -180 < wind_direction <= 180
self.wind_direction = math.radians(wind_direction)

Expand All @@ -434,6 +489,9 @@ def __init__(

self.wind_speed = wind_speed
self.method = method
# sailbot time needs more samples to find max_motion_cost, even 2000 is not enough
num_samples = 2000 if self.method == SpeedObjectiveMethod.SAILBOT_TIME else 200
super().__init__(space_information, num_samples=num_samples)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the cost associated with the speed of the boat.
Expand All @@ -443,18 +501,17 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The cost of going upwind or downwind
ob.Cost: The normalized cost of going upwind or downwind
"""

s1_xy = cs.XY(s1.getX(), s1.getY())
s2_xy = cs.XY(s2.getX(), s2.getY())

heading_s1_to_s2 = 180 * math.atan2(s2_xy.x - s1_xy.x, s2_xy.y - s1_xy.y) / math.pi
sailbot_speed = self.get_sailbot_speed(
self.heading_direction, self.wind_direction, self.wind_speed
heading_s1_to_s2, self.wind_direction, self.wind_speed
)

if sailbot_speed == 0:
return ob.Cost(10000)
return ob.Cost(1.0)

if self.method == SpeedObjectiveMethod.SAILBOT_TIME:
distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy)
Expand All @@ -468,7 +525,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
cost = ob.Cost(self.get_continuous_cost(sailbot_speed))
else:
ValueError(f"Method {self.method} not supported")
return cost
return ob.Cost(self.normalization(cost.value()))

@staticmethod
def get_sailbot_speed(heading: float, wind_direction: float, wind_speed: float) -> float:
Expand Down Expand Up @@ -571,28 +628,23 @@ def get_sailing_objective(
wind_speed: float,
) -> ob.OptimizationObjective:
objective = ob.MultiOptimizationObjective(si=space_information)
objective.addObjective(
objective=DistanceObjective(space_information, DistanceMethod.LATLON),
weight=1.0,
)
objective.addObjective(
objective=MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
),
weight=100.0,
objective_1 = DistanceObjective(
space_information=space_information, method=DistanceMethod.LATLON
)
objective.addObjective(
objective=WindObjective(space_information, wind_direction_degrees), weight=1.0
objective_2 = MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
)
objective.addObjective(
objective=SpeedObjective(
space_information,
heading_degrees,
wind_direction_degrees,
wind_speed,
SpeedObjectiveMethod.SAILBOT_TIME,
),
weight=1.0,
objective_3 = WindObjective(space_information, wind_direction_degrees)
objective_4 = SpeedObjective(
space_information=space_information,
heading_direction=heading_degrees,
wind_direction=wind_direction_degrees,
wind_speed=wind_speed,
method=SpeedObjectiveMethod.SAILBOT_TIME,
)

objective.addObjective(objective=objective_1, weight=0.25)
objective.addObjective(objective=objective_2, weight=0.25)
objective.addObjective(objective=objective_3, weight=0.25)
objective.addObjective(objective=objective_4, weight=0.25)
return objective
Loading