Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Normalizing Each Objective Class's motionCost() #66

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
413255e
Added normalising funcs to Distance Obj
akash-venkateshwaran Dec 7, 2023
b187310
Corrected the code for SE2StateSpace()
akash-venkateshwaran Dec 8, 2023
29fbaef
Shifted the def of unc to parent class and inherited in all the objec…
akash-venkateshwaran Dec 8, 2023
a61768e
Add missing type hint
patrick-5546 Dec 8, 2023
4d1ef28
Remove _ prefix
patrick-5546 Dec 8, 2023
243d29a
Refactor normalization
patrick-5546 Dec 8, 2023
357f942
sample and find max cost in base objective
patrick-5546 Dec 8, 2023
4061791
Code style and docstrings
patrick-5546 Dec 8, 2023
e2f0de7
Add tests
patrick-5546 Dec 8, 2023
6e1de98
Fix explicit test
patrick-5546 Dec 8, 2023
1db8278
Cleanup
patrick-5546 Dec 8, 2023
f8e873c
Formatting
patrick-5546 Dec 8, 2023
7ab1f64
Added unit tests for all three objs
akash-venkateshwaran Jan 28, 2024
6f06fa5
Merge remote-tracking branch 'origin' into user/akash-venkateshwaran/…
patrick-5546 Jan 28, 2024
8b5de1d
Hardcode num_samples in each objective function
patrick-5546 Jan 28, 2024
4c2b81c
Update docstrings
patrick-5546 Jan 28, 2024
a6a68ac
Reorder tests
patrick-5546 Jan 28, 2024
4bddba0
Remove redundant check
patrick-5546 Jan 29, 2024
a2f81cd
Fix lint error
patrick-5546 Jan 29, 2024
cfc839a
Added cap func for motionCost
akash-venkateshwaran Feb 3, 2024
2383c52
Replaced capping with a single function named normalization
akash-venkateshwaran Feb 10, 2024
70513fd
Simplify normalization logic
patrick-5546 Feb 10, 2024
e1e4068
Add explanatory comment
patrick-5546 Feb 10, 2024
d0106ed
wokring on SpeedObj
akash-venkateshwaran Feb 11, 2024
4879bbc
wokring on SpeedObj
akash-venkateshwaran Feb 11, 2024
86be242
Fix test and cleanup
patrick-5546 Feb 11, 2024
727fc78
dynamically compute heading s1 to s2
jamenkaye Mar 1, 2024
a5946db
Merge branch 'main' into user/akash-venkateshwaran/33-normalising_obj…
patrick-5546 Mar 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 96 additions & 46 deletions local_pathfinding/objectives.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Our custom OMPL optimization objectives."""

import itertools
import math
from enum import Enum, auto

Expand Down Expand Up @@ -63,19 +64,71 @@ class Objective(ob.StateCostIntegralObjective):
- This class inherits from the OMPL class StateCostIntegralObjective:
https://ompl.kavrakilab.org/classompl_1_1base_1_1StateCostIntegralObjective.html
- Camelcase is used for functions that override OMPL functions, as that is their convention.
- Also computes the maximum motion cost for normalization purposes.

Attributes:
space_information (StateSpacePtr): Contains all the information about
the space planning is done in.
max_motion_cost (float): The maximum motion cost between any two states in the state space.
"""

def __init__(self, space_information):
def __init__(self, space_information, num_samples: int):
super().__init__(si=space_information, enableMotionCostInterpolation=True)
self.space_information = space_information

states = self.sample_states(num_samples)
# initialize to 1 so that motionCost is not normalized when finding the maximum motion cost
self.max_motion_cost = 1.0
self.max_motion_cost = self.find_maximum_motion_cost(states)
patrick-5546 marked this conversation as resolved.
Show resolved Hide resolved

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
raise NotImplementedError

def find_maximum_motion_cost(self, states: list[ob.SE2StateSpace]) -> float:
"""Finds the maximum motion cost between any two states in `states`.

Args:
states (list[ob.SE2StateSpace]): OMPL states.

Returns:
float: Maximum motion cost.
"""
return max(
self.motionCost(s1, s2).value()
for s1, s2 in itertools.combinations(iterable=states, r=2)
)

def sample_states(self, num_samples: int) -> list[ob.SE2StateSpace]:
"""Samples `num_samples` states from the state space.

Args:
num_samples (int): Number of states to sample.

Returns:
list[ob.SE2StateSpace]: OMPL states.
"""
sampler = self.space_information.getStateSpace().allocDefaultStateSampler()

sampled_states = []

for _ in range(num_samples):
state = self.space_information.getStateSpace().allocState()
sampler.sampleUniform(state)
sampled_states.append(state)

return sampled_states

def normalization(self, cost: float) -> float:
"""Normalizes cost using max_motion_cost and caps it at 1.

Args:
cost (float): motionCost value from an objective function.
Returns:
float: normalized cost between 0 to 1.
"""
normalized_cost = cost / self.max_motion_cost
return min(normalized_cost, 1.0)


class DistanceObjective(Objective):
"""Generates a distance objective function
Expand All @@ -94,13 +147,14 @@ def __init__(
method: DistanceMethod,
reference=HelperLatLon(latitude=0.0, longitude=0.0),
):
super().__init__(space_information)
self.method = method
if self.method == DistanceMethod.OMPL_PATH_LENGTH:
self.ompl_path_objective = ob.PathLengthOptimizationObjective(self.space_information)
self.ompl_path_objective = ob.PathLengthOptimizationObjective(space_information)
elif self.method == DistanceMethod.LATLON:
self.reference = reference

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the distance between two points

Expand All @@ -109,7 +163,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The distance between two points object
ob.Cost: The normalized distance between two points

Raises:
ValueError: If the distance method is not supported
Expand All @@ -118,17 +172,16 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2_xy = cs.XY(s2.getX(), s2.getY())
if self.method == DistanceMethod.EUCLIDEAN:
distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.LATLON:
distance = DistanceObjective.get_latlon_path_length_objective(
s1_xy, s2_xy, self.reference
)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.OMPL_PATH_LENGTH:
cost = self.ompl_path_objective.motionCost(s1_xy, s2_xy)
distance = self.ompl_path_objective.motionCost(s1, s2).value()
else:
ValueError(f"Method {self.method} not supported")
return cost
raise ValueError(f"Method {self.method} not supported")

return ob.Cost(self.normalization(distance))

@staticmethod
def get_euclidean_path_length_objective(s1: cs.XY, s2: cs.XY) -> float:
Expand Down Expand Up @@ -177,20 +230,17 @@ class MinimumTurningObjective(Objective):
"""

def __init__(
self,
space_information,
simple_setup,
heading_degrees: float,
method: MinimumTurningMethod,
self, space_information, simple_setup, heading_degrees: float, method: MinimumTurningMethod
):
super().__init__(space_information)
self.goal = cs.XY(
simple_setup.getGoal().getState().getX(), simple_setup.getGoal().getState().getY()
)
assert -180 < heading_degrees <= 180
self.heading = math.radians(heading_degrees)
self.method = method

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the turning cost between s1, s2, heading or the goal position

Expand All @@ -199,7 +249,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The minimum turning angle in degrees
ob.Cost: The normalized minimum turning angle in degrees

Raises:
ValueError: If the minimum turning method is not supported
Expand All @@ -213,8 +263,9 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
elif self.method == MinimumTurningMethod.HEADING_PATH:
angle = self.heading_path_turn_cost(s1_xy, s2_xy, self.heading)
else:
ValueError(f"Method {self.method} not supported")
return ob.Cost(angle)
raise ValueError(f"Method {self.method} not supported")

return ob.Cost(self.normalization(angle))

@staticmethod
def goal_heading_turn_cost(s1: cs.XY, goal: cs.XY, heading: float) -> float:
Expand Down Expand Up @@ -302,10 +353,11 @@ class WindObjective(Objective):
"""

def __init__(self, space_information, wind_direction_degrees: float):
super().__init__(space_information)
assert -180 < wind_direction_degrees <= 180
self.wind_direction = math.radians(wind_direction_degrees)

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the cost associated with the upwind and downwind directions of the boat in
relation to the wind.
Expand All @@ -315,11 +367,13 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The cost of going upwind or downwind
ob.Cost: The normalized cost of going upwind or downwind
"""
s1_xy = cs.XY(s1.getX(), s1.getY())
s2_xy = cs.XY(s2.getX(), s2.getY())
return ob.Cost(WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction))
wind_cost = WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction)

return ob.Cost(self.normalization(wind_cost))

@staticmethod
def wind_direction_cost(s1: cs.XY, s2: cs.XY, wind_direction: float) -> float:
Expand Down Expand Up @@ -425,7 +479,6 @@ def __init__(
wind_speed: float,
method: SpeedObjectiveMethod,
):
super().__init__(space_information)
assert -180 < wind_direction <= 180
self.wind_direction = math.radians(wind_direction)

Expand All @@ -434,6 +487,7 @@ def __init__(

self.wind_speed = wind_speed
self.method = method
super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the cost associated with the speed of the boat.
Expand All @@ -443,18 +497,19 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The cost of going upwind or downwind
ob.Cost: The normalized cost of going upwind or downwind
"""

s1_xy = cs.XY(s1.getX(), s1.getY())
s2_xy = cs.XY(s2.getX(), s2.getY())
heading_s1_to_s2 = math.atan2(s2_xy.x - s1_xy.x, s2_xy.y - s1_xy.y)

sailbot_speed = self.get_sailbot_speed(
self.heading_direction, self.wind_direction, self.wind_speed
heading_s1_to_s2, self.wind_direction, self.wind_speed
)

if sailbot_speed == 0:
return ob.Cost(10000)
return ob.Cost(1.0)

if self.method == SpeedObjectiveMethod.SAILBOT_TIME:
distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy)
Expand All @@ -468,7 +523,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
cost = ob.Cost(self.get_continuous_cost(sailbot_speed))
else:
ValueError(f"Method {self.method} not supported")
return cost
return ob.Cost(self.normalization(cost.value()))

@staticmethod
def get_sailbot_speed(heading: float, wind_direction: float, wind_speed: float) -> float:
Expand Down Expand Up @@ -571,28 +626,23 @@ def get_sailing_objective(
wind_speed: float,
) -> ob.OptimizationObjective:
objective = ob.MultiOptimizationObjective(si=space_information)
objective.addObjective(
objective=DistanceObjective(space_information, DistanceMethod.LATLON),
weight=1.0,
)
objective.addObjective(
objective=MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
),
weight=100.0,
objective_1 = DistanceObjective(
space_information=space_information, method=DistanceMethod.LATLON
)
objective.addObjective(
objective=WindObjective(space_information, wind_direction_degrees), weight=1.0
objective_2 = MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
)
objective.addObjective(
objective=SpeedObjective(
space_information,
heading_degrees,
wind_direction_degrees,
wind_speed,
SpeedObjectiveMethod.SAILBOT_TIME,
),
weight=1.0,
objective_3 = WindObjective(space_information, wind_direction_degrees)
objective_4 = SpeedObjective(
space_information=space_information,
heading_direction=heading_degrees,
wind_direction=wind_direction_degrees,
wind_speed=wind_speed,
method=SpeedObjectiveMethod.SAILBOT_TIME,
)

objective.addObjective(objective=objective_1, weight=0.25)
objective.addObjective(objective=objective_2, weight=0.25)
objective.addObjective(objective=objective_3, weight=0.25)
objective.addObjective(objective=objective_4, weight=0.25)
return objective
Loading