Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Normalizing Each Objective Class's motionCost() #66

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
413255e
Added normalising funcs to Distance Obj
akash-venkateshwaran Dec 7, 2023
b187310
Corrected the code for SE2StateSpace()
akash-venkateshwaran Dec 8, 2023
29fbaef
Shifted the def of unc to parent class and inherited in all the objec…
akash-venkateshwaran Dec 8, 2023
a61768e
Add missing type hint
patrick-5546 Dec 8, 2023
4d1ef28
Remove _ prefix
patrick-5546 Dec 8, 2023
243d29a
Refactor normalization
patrick-5546 Dec 8, 2023
357f942
sample and find max cost in base objective
patrick-5546 Dec 8, 2023
4061791
Code style and docstrings
patrick-5546 Dec 8, 2023
e2f0de7
Add tests
patrick-5546 Dec 8, 2023
6e1de98
Fix explicit test
patrick-5546 Dec 8, 2023
1db8278
Cleanup
patrick-5546 Dec 8, 2023
f8e873c
Formatting
patrick-5546 Dec 8, 2023
7ab1f64
Added unit tests for all three objs
akash-venkateshwaran Jan 28, 2024
6f06fa5
Merge remote-tracking branch 'origin' into user/akash-venkateshwaran/…
patrick-5546 Jan 28, 2024
8b5de1d
Hardcode num_samples in each objective function
patrick-5546 Jan 28, 2024
4c2b81c
Update docstrings
patrick-5546 Jan 28, 2024
a6a68ac
Reorder tests
patrick-5546 Jan 28, 2024
4bddba0
Remove redundant check
patrick-5546 Jan 29, 2024
a2f81cd
Fix lint error
patrick-5546 Jan 29, 2024
cfc839a
Added cap func for motionCost
akash-venkateshwaran Feb 3, 2024
2383c52
Replaced capping with a single function named normalization
akash-venkateshwaran Feb 10, 2024
70513fd
Simplify normalization logic
patrick-5546 Feb 10, 2024
e1e4068
Add explanatory comment
patrick-5546 Feb 10, 2024
d0106ed
wokring on SpeedObj
akash-venkateshwaran Feb 11, 2024
4879bbc
wokring on SpeedObj
akash-venkateshwaran Feb 11, 2024
86be242
Fix test and cleanup
patrick-5546 Feb 11, 2024
727fc78
dynamically compute heading s1 to s2
jamenkaye Mar 1, 2024
a5946db
Merge branch 'main' into user/akash-venkateshwaran/33-normalising_obj…
patrick-5546 Mar 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 82 additions & 33 deletions local_pathfinding/objectives.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Our custom OMPL optimization objectives."""

import itertools
import math
from enum import Enum, auto

Expand Down Expand Up @@ -40,19 +41,64 @@ class Objective(ob.StateCostIntegralObjective):
- This class inherits from the OMPL class StateCostIntegralObjective:
https://ompl.kavrakilab.org/classompl_1_1base_1_1StateCostIntegralObjective.html
- Camelcase is used for functions that override OMPL functions, as that is their convention.
- Also computes the maximum motion cost for normalization purposes.

Attributes:
space_information (StateSpacePtr): Contains all the information about
the space planning is done in.
max_motion_cost (float): The maximum motion cost between any two states in the state space.
"""

def __init__(self, space_information):
def __init__(self, space_information, num_samples: int):
super().__init__(si=space_information, enableMotionCostInterpolation=True)
self.space_information = space_information

states = self.sample_states(num_samples)
self.max_motion_cost = 1.0
self.max_motion_cost = self.find_maximum_motion_cost(states)
patrick-5546 marked this conversation as resolved.
Show resolved Hide resolved

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
raise NotImplementedError

def find_maximum_motion_cost(self, states: list[ob.SE2StateSpace]) -> float:
"""Finds the maximum motion cost between any two states in `states`.

Args:
states (list[ob.SE2StateSpace]): OMPL states.

Returns:
float: Maximum motion cost.
"""
return max(
self.motionCost(s1, s2).value()
for s1, s2 in itertools.combinations(iterable=states, r=2)
)

def sample_states(self, num_samples: int) -> list[ob.SE2StateSpace]:
"""Samples `num_samples` states from the state space.

Args:
num_samples (int): Number of states to sample.

Returns:
list[ob.SE2StateSpace]: OMPL states.
"""
sampler = self.space_information.getStateSpace().allocDefaultStateSampler()

sampled_states = []

for _ in range(num_samples):
state = self.space_information.getStateSpace().allocState()
sampler.sampleUniform(state)
sampled_states.append(state)

return sampled_states

def capping_motionCost(self, value):
if value > 1:
value == 1
return value
patrick-5546 marked this conversation as resolved.
Show resolved Hide resolved


class DistanceObjective(Objective):
"""Generates a distance objective function
Expand All @@ -71,13 +117,14 @@ def __init__(
method: DistanceMethod,
reference=HelperLatLon(latitude=0.0, longitude=0.0),
):
super().__init__(space_information)
self.method = method
if self.method == DistanceMethod.OMPL_PATH_LENGTH:
self.ompl_path_objective = ob.PathLengthOptimizationObjective(self.space_information)
self.ompl_path_objective = ob.PathLengthOptimizationObjective(space_information)
elif self.method == DistanceMethod.LATLON:
self.reference = reference

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the distance between two points

Expand All @@ -86,7 +133,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The distance between two points object
ob.Cost: The normalized distance between two points

Raises:
ValueError: If the distance method is not supported
Expand All @@ -95,17 +142,18 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2_xy = cs.XY(s2.getX(), s2.getY())
if self.method == DistanceMethod.EUCLIDEAN:
distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.LATLON:
distance = DistanceObjective.get_latlon_path_length_objective(
s1_xy, s2_xy, self.reference
)
cost = ob.Cost(distance)
elif self.method == DistanceMethod.OMPL_PATH_LENGTH:
cost = self.ompl_path_objective.motionCost(s1_xy, s2_xy)
distance = self.ompl_path_objective.motionCost(s1, s2).value()
else:
ValueError(f"Method {self.method} not supported")
return cost
raise ValueError(f"Method {self.method} not supported")

normalized_distance = distance / self.max_motion_cost
normalized_distance = self.capping_motionCost(normalized_distance)
return ob.Cost(normalized_distance)

@staticmethod
def get_euclidean_path_length_objective(s1: cs.XY, s2: cs.XY) -> float:
Expand Down Expand Up @@ -154,20 +202,17 @@ class MinimumTurningObjective(Objective):
"""

def __init__(
self,
space_information,
simple_setup,
heading_degrees: float,
method: MinimumTurningMethod,
self, space_information, simple_setup, heading_degrees: float, method: MinimumTurningMethod
):
super().__init__(space_information)
self.goal = cs.XY(
simple_setup.getGoal().getState().getX(), simple_setup.getGoal().getState().getY()
)
assert -180 < heading_degrees <= 180
self.heading = math.radians(heading_degrees)
self.method = method

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the turning cost between s1, s2, heading or the goal position

Expand All @@ -176,7 +221,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The minimum turning angle in degrees
ob.Cost: The normalized minimum turning angle in degrees

Raises:
ValueError: If the minimum turning method is not supported
Expand All @@ -190,8 +235,11 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
elif self.method == MinimumTurningMethod.HEADING_PATH:
angle = self.heading_path_turn_cost(s1_xy, s2_xy, self.heading)
else:
ValueError(f"Method {self.method} not supported")
return ob.Cost(angle)
raise ValueError(f"Method {self.method} not supported")

normalized_angle = angle / self.max_motion_cost
normalized_angle = self.capping_motionCost(normalized_angle)
return ob.Cost(normalized_angle)

@staticmethod
def goal_heading_turn_cost(s1: cs.XY, goal: cs.XY, heading: float) -> float:
Expand Down Expand Up @@ -279,10 +327,11 @@ class WindObjective(Objective):
"""

def __init__(self, space_information, wind_direction_degrees: float):
super().__init__(space_information)
assert -180 < wind_direction_degrees <= 180
self.wind_direction = math.radians(wind_direction_degrees)

super().__init__(space_information, num_samples=100)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
"""Generates the cost associated with the upwind and downwind directions of the boat in
relation to the wind.
Expand All @@ -292,11 +341,15 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
s2 (SE2StateInternal): The ending point of the local goal state

Returns:
ob.Cost: The cost of going upwind or downwind
ob.Cost: The normalized cost of going upwind or downwind
"""
s1_xy = cs.XY(s1.getX(), s1.getY())
s2_xy = cs.XY(s2.getX(), s2.getY())
return ob.Cost(WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction))
wind_cost = WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction)

normalized_wind_cost = wind_cost / self.max_motion_cost
normalized_wind_cost = self.capping_motionCost(normalized_wind_cost)
return ob.Cost(normalized_wind_cost)

@staticmethod
def wind_direction_cost(s1: cs.XY, s2: cs.XY, wind_direction: float) -> float:
Expand Down Expand Up @@ -390,18 +443,14 @@ def get_sailing_objective(
space_information, simple_setup, heading_degrees: float, wind_direction_degrees: float
) -> ob.OptimizationObjective:
objective = ob.MultiOptimizationObjective(si=space_information)
objective.addObjective(
objective=DistanceObjective(space_information, DistanceMethod.LATLON),
weight=1.0,
)
objective.addObjective(
objective=MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
),
weight=100.0,
objective_1 = DistanceObjective(
space_information=space_information, method=DistanceMethod.LATLON
)
objective.addObjective(
objective=WindObjective(space_information, wind_direction_degrees), weight=1.0
objective_2 = MinimumTurningObjective(
space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING
)

objective_3 = WindObjective(space_information, wind_direction_degrees)
objective.addObjective(objective=objective_1, weight=0.33)
objective.addObjective(objective=objective_2, weight=0.33)
objective.addObjective(objective=objective_3, weight=0.34)
return objective
98 changes: 82 additions & 16 deletions test/test_objectives.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import math

import pytest
Expand All @@ -20,20 +21,37 @@
)


""" Tests for distance objective """


@pytest.mark.parametrize(
"method",
"method,max_motion_cost",
[
objectives.DistanceMethod.EUCLIDEAN,
objectives.DistanceMethod.LATLON,
objectives.DistanceMethod.OMPL_PATH_LENGTH,
(objectives.DistanceMethod.EUCLIDEAN, 2.5),
(objectives.DistanceMethod.LATLON, 2700),
(objectives.DistanceMethod.OMPL_PATH_LENGTH, 4.0),
],
)
def test_distance_objective(method: objectives.DistanceMethod):
def test_distance_objective(method: objectives.DistanceMethod, max_motion_cost: float):
distance_objective = objectives.DistanceObjective(
PATH._simple_setup.getSpaceInformation(),
method,
)
assert distance_objective is not None

# test sample_states()
num_samples = 3
sampled_states = distance_objective.sample_states(num_samples)
assert len(sampled_states) == num_samples
# for state in sampled_states:
# assert ompl_path.is_state_valid(state)
patrick-5546 marked this conversation as resolved.
Show resolved Hide resolved

# test find_maximum_motion_cost()
assert distance_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0)

# test if the motionCost() is normalized between 0 and 1 for 10 random samples
states = distance_objective.sample_states(10)
for s1, s2 in itertools.combinations(iterable=states, r=2):
assert 0 <= distance_objective.motionCost(s1, s2).value() < 1


@pytest.mark.parametrize(
Expand Down Expand Up @@ -79,22 +97,41 @@ def test_get_latlon_path_length_objective(rf: tuple, cs1: tuple, cs2: tuple):
) == pytest.approx(distance_m)


""" Tests for minimum turning objective """


@pytest.mark.parametrize(
"method",
"method,heading_degrees,max_motion_cost",
[
objectives.MinimumTurningMethod.GOAL_HEADING,
objectives.MinimumTurningMethod.GOAL_PATH,
objectives.MinimumTurningMethod.HEADING_PATH,
(objectives.MinimumTurningMethod.GOAL_HEADING, 60.0, 174.862),
(objectives.MinimumTurningMethod.GOAL_PATH, 60.0, 179.340),
(objectives.MinimumTurningMethod.HEADING_PATH, 60.0, 179.962),
],
)
def test_minimum_turning_objective(method: objectives.MinimumTurningMethod):
def test_minimum_turning_objective(
method: objectives.MinimumTurningMethod, heading_degrees: float, max_motion_cost: float
):
minimum_turning_objective = objectives.MinimumTurningObjective(
PATH._simple_setup.getSpaceInformation(),
PATH._simple_setup,
PATH.state.heading_direction,
heading_degrees,
method,
)
assert minimum_turning_objective is not None

# test sample_states()
num_samples = 3
sampled_states = minimum_turning_objective.sample_states(num_samples)
assert len(sampled_states) == num_samples
# for state in sampled_states:
# assert ompl_path.is_state_valid(state)

# test find_maximum_motion_cost()
assert minimum_turning_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0)

# test if the motionCost() is normalized between 0 and 1 for 10 random samples
states = minimum_turning_objective.sample_states(10)
for s1, s2 in itertools.combinations(iterable=states, r=2):
assert 0 <= minimum_turning_objective.motionCost(s1, s2).value() < 1


@pytest.mark.parametrize(
Expand Down Expand Up @@ -147,6 +184,38 @@ def test_heading_path_turn_cost(cs1: tuple, cs2: tuple, heading_degrees: float,
) == pytest.approx(expected, abs=1e-3)


""" Tests for wind objective """


@pytest.mark.parametrize(
"wind_direction_deg,max_motion_cost",
[
(60.0, 7593.768),
(45.0, 7763.842),
(0.0, 7579.767),
],
)
def test_wind_objective(wind_direction_deg: float, max_motion_cost: float):
wind_objective = objectives.WindObjective(
PATH._simple_setup.getSpaceInformation(), wind_direction_deg
)

# test sample_states()
num_samples = 3
sampled_states = wind_objective.sample_states(num_samples)
assert len(sampled_states) == num_samples
# for state in sampled_states:
# assert ompl_path.is_state_valid(state)

# test find_maximum_motion_cost()
assert wind_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0)

# test if the motionCost() is normalized between 0 and 1 for 10 random samples
states = wind_objective.sample_states(10)
for s1, s2 in itertools.combinations(iterable=states, r=2):
assert 0 <= wind_objective.motionCost(s1, s2).value() < 1


@pytest.mark.parametrize(
"cs1,cs2,wind_direction_deg,expected",
[
Expand Down Expand Up @@ -193,9 +262,6 @@ def test_is_downwind(wind_direction_deg: float, heading_deg: float, expected: fl
assert objectives.WindObjective.is_downwind(wind_direction, heading) == expected


""" Tests for is_angle_between() """


@pytest.mark.parametrize(
"afir,amid,asec,expected",
[
Expand Down