Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Custom planner #39

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion local_pathfinding/ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from ompl import util as ou
from rclpy.impl.rcutils_logger import RcutilsLogger

from .planners import A_star

if TYPE_CHECKING:
from local_pathfinding.local_path import LocalPathState

Expand Down Expand Up @@ -143,7 +145,7 @@ def _init_simple_setup(self) -> og.SimpleSetup:

# set the planner of the simple setup object
# TODO: implement and add planner here
# simple_setup.setPlanner(planner)
simple_setup.setPlanner(A_star.Astar(simple_setup.getSpaceInformation()))

return simple_setup

Expand Down
191 changes: 191 additions & 0 deletions local_pathfinding/planners/A_star.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#!/usr/bin/env python

import heapq
import math

from ompl import base as ob
from ompl import geometric as og


class Node:
"""A node class for A* pathfinding"""

def __init__(self, parent=None, position=None):
"""Initialize the Node class.

Args:
parent (Node, optional): Parent Node. Defaults to None.
position (ob.State, optional): Position of the Node (x, y). Defaults to None.
"""
self.parent = parent
self.position = position
self.g = 0
self.h = 0
self.f = 0

def __eq__(self, other) -> bool:
"""Check if two nodes are equal.

Args:
other (Node): Node to compare to.

Returns:
bool: True if equal, False otherwise.
"""
return (
self.position.getX() == other.position.getX()
and self.position.getY() == other.position.getY()
)
# return self.position[0] == other.position[0] and self.position[1] == other.position[1]

def __str__(self) -> str:
"""Return a string representation of the Node.

Returns:
str: String representation of the Node.
"""
return (
str(self.position.getX())
+ ", "
+ str(self.position.getY())
+ ", "
+ str(self.position.getYaw() * 180 / math.pi)
)

def __lt__(self, other) -> bool:
"""Check if the current node is less than another node.

Args:
other (Node): Node to compare to.

Returns:
bool: True if less than, False otherwise.
"""
return self.f < other.f

def __gt__(self, other) -> bool:
"""Check if the current node is greater than another node.

Args:
other (Node): Node to compare to.

Returns:
bool: True if greater than, False otherwise.
"""
return self.f > other.f


class Astar(ob.Planner):
def __init__(self, si: ob.SpaceInformation):
"""Initialize the Astar class.

Args:
si (ob.SpaceInformation): Space information.
"""
super(Astar, self).__init__(si, "Astar")
self.states_ = []
self.sampler_ = si.allocStateSampler()

def solve(self, ptc: ob.PlannerTerminationCondition) -> ob.PlannerStatus:
"""Solve the Astar problem.

Args:
ptc (ob.PlannerTerminationCondition): Planner termination condition.

Returns:
ob.PlannerStatus: Planner status.
"""
pdef = self.getProblemDefinition() # type: ob.ProblemDefinition
goal = pdef.getGoal() # type: ob.GoalState
si = self.getSpaceInformation() # type: ob.SpaceInformation
pi = self.getPlannerInputStates() # type: ob.PlannerInputStates
st = pi.nextStart() # type: ob.State
while st:
self.states_.append(st)
st = pi.nextStart()
solution = None
approxsol = 0
# approxdif = 1e6
start_state = pdef.getStartState(0)
goal_state = goal.getState()
start_node = Node(None, start_state)
start_node.g = start_state.h = start_node.f = 0
end_node = Node(None, goal_state)
end_node.g = end_node.h = end_node.f = 0

open_list = []
closed_list = []
heapq.heapify(open_list)
adjacent_squares = (
(1, 0, 0),
(1, 1, 45),
(0, 1, 90),
(-1, 1, 135),
(-1, 0, 0),
(-1, -1, -135),
(0, -1, -90),
(1, -1, -45),
)

heapq.heappush(open_list, start_node)
while len(open_list) > 0 and not ptc():
current_node = heapq.heappop(open_list)
if current_node == end_node: # if we hit the goal
current = current_node
path = []
while current is not None:
path.append(current.position)
current = current.parent
for i in range(1, len(path)):
self.states_.append(path[len(path) - i - 1])
solution = len(self.states_)
break
closed_list.append(current_node)

children = []
for new_position in adjacent_squares:
node_position = si.allocState()
current_node_x = current_node.position.getX()
current_node_y = current_node.position.getY()
node_position.setXY(
current_node_x + new_position[0], current_node_y + new_position[1]
)
node_position.setYaw(new_position[2] * math.pi / 180)

if not si.checkMotion(current_node.position, node_position):
continue
if not si.satisfiesBounds(node_position):
continue
new_node = Node(current_node, node_position)
children.append(new_node)

for child in children:
if child in closed_list:
continue
if child.position.getYaw() % (math.pi / 2) == 0:
child.g = current_node.g + 1
else:
child.g = current_node.g + math.sqrt(2)
child.h = goal.distanceGoal(child.position)
child.f = child.g + child.h
if len([i for i in open_list if child == i and child.g >= i.g]) > 0:
continue
heapq.heappush(open_list, child)

solved = False
approximate = False
if not solution:
solution = approxsol
approximate = False
if solution:
path = og.PathGeometric(si)
for s in self.states_[:solution]:
path.append(s)
pdef.addSolutionPath(path)
solved = True
return ob.PlannerStatus(solved, approximate)

def clear(self):
"""Clear the Astar problem."""
super(Astar, self).clear()
self.states_ = []
Empty file.
100 changes: 100 additions & 0 deletions local_pathfinding/planners/potential_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python

from ompl import base as ob
from ompl import geometric as og


class PotentialFieldPlanner(ob.Planner):
"""Potential Field Planner class."""

def __init__(self, si):
"""Initialize the Potential Field Planner class.

Args:
si (ob.SpaceInformation): OMPL Space Information object.
"""
super(PotentialFieldPlanner, self).__init(si, "PotentialFieldPlanner")
self.states_ = []

def solve(self, ptc) -> ob.PlannerStatus:
"""Solve the potential field problem.

Args:
ptc (ob.PlannerTerminationCondition): Planner termination condition.

Returns:
ob.PlannerStatus: Planner status.
"""
# Get the problem definition and space information
pdef = self.getProblemDefinition()
si = self.getSpaceInformation()
goal = pdef.getGoal()

# Get the initial and goal states
start = pdef.getStartState(0)
goal_state = goal.getState()

current_state = start

# Iterate and navigate the sailboat using the potential field
while not ptc():
# Calculate the gradient of the potential field at the current state
gradient = calculate_gradient(current_state, goal_state)

# Update the sailboat's position and heading based on the gradient
current_state = navigate_sailboat(current_state, gradient)

# Add the current state to the list of states
self.states_.append(current_state)

# Check if the sailboat has reached the goal
if si.distance(current_state, goal_state) < si.getGoal().getThreshold():
break

# Create a path from the list of states
path = og.PathGeometric(si)
for state in self.states_:
path.append(state)

# Add the path to the problem definition
pdef.addSolutionPath(path)

# Return the planner status
return ob.PlannerStatus(pdef.hasSolution(), False)

def clear(self):
"""Clear the potential field planner."""
super(PotentialFieldPlanner, self).clear()
self.states_ = []


def calculate_gradient(current_state, goal_state) -> tuple:
"""Calculate the gradient of the potential field at the current state.

Args:
current_state (ob.State): Current state.
goal_state (ob.State): Goal state.

Returns:
tuple: Gradient vector as a tuple of floats.
"""
# Calculate the gradient of the potential field at the current state
# This function should return the gradient vector as a tuple of floats

# Replace this with your implementation
return (0.0, 0.0)


def navigate_sailboat(current_state, gradient) -> ob.State:
"""Update the sailboat's position and heading based on the gradient.

Args:
current_state (ob.State): Current state.
gradient (tuple): Gradient vector as a tuple of floats.
"""
# Update the sailboat's position and heading based on the gradient
# This function should adjust the boat's sail configuration and heading
# to align with the gradient direction

# Replace this with your implementation
return current_state