Skip to content

Commit

Permalink
gamepad control implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
isaac-racine committed Oct 23, 2024
1 parent 4fc117f commit 19285b3
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
},
)

gym.register(
id="Isaac-Velocity-Rough-Unitree-Go2-Play-Control",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": rough_env_cfg.UnitreeGo2RoughEnvCfg_PLAYCONTROL,
"rsl_rl_cfg_entry_point": agents.rsl_rl_cfg.UnitreeGo2RoughPPORunnerCfg,
},
)
# gym.register(
# id="Isaac-Velocity-Rough-Unitree-Go2-Play-Control",
# entry_point="omni.isaac.orbit.envs:RLTaskEnv",
# disable_env_checker=True,
# kwargs={
# "env_cfg_entry_point": rough_env_cfg.UnitreeGo2RoughEnvCfg_PLAYCONTROL,
# "rsl_rl_cfg_entry_point": agents.rsl_rl_cfg.UnitreeGo2RoughPPORunnerCfg,
# },
# )
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,3 @@ def __post_init__(self):
self.events.base_external_force_torque = None
self.events.push_robot = None


@configclass
class UnitreeGo2RoughEnvCfg_PLAYCONTROL(UnitreeGo2RoughEnvCfg_PLAY):
def __post_init__(self):
# post init of parent
super().__post_init__()

self.commands.base_velocity = mdp.UserVelocityCommandCfg(
asset_name="robot",
debug_vis=True,
resampling_time_range=(10.0, 10.0), # not used
)
213 changes: 176 additions & 37 deletions source/standalone/workflows/rsl_rl/play_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

"""Launch Isaac Sim Simulator first."""

import carb
import argparse

from omni.isaac.lab.app import AppLauncher
Expand All @@ -21,23 +22,25 @@

# add argparse arguments
parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL")
parser.add_argument("--cpu", action="store_true", default=False, help="Use CPU pipeline")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations"
)

parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.")
parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).")
parser.add_argument("--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations.")
parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment")
parser.add_argument("--lin_sensi", type=float, default=2, help="Gamepad linear speed sensitivity")
parser.add_argument("--rot_sensi", type=float, default=3.14 / 2, help="Gamepad rotational speed sensitivity")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default="Isaac-Velocity-Rough-Unitree-Go2-Play-v1", help="Name of the task.")
parser.add_argument(
"--device", choices=["gamepad", "keyboard"], default="gamepad", help="Choose from options: gamepad, keyboard"
)
parser.add_argument("--teleop_device", choices=["gamepad", "keyboard"], default="gamepad", help="Choose from options: gamepad, keyboard")

# append RSL-RL cli arguments
cli_args.add_rsl_rl_args(parser)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
# always enable cameras to record video
if args_cli.video:
args_cli.enable_cameras = True

# launch omniverse app
app_launcher = AppLauncher(args_cli)
Expand All @@ -49,71 +52,102 @@
import os
import torch

from omni.isaac.orbit.devices import Se2Gamepad, Se2Keyboard
from omni.isaac.orbit.envs import ViewerCfg
from omni.isaac.orbit.envs.ui import ViewportCameraController
from rsl_rl.runners import OnPolicyRunner

from omni.isaac.lab.envs import DirectMARLEnv, multi_agent_to_single_agent
from omni.isaac.lab.utils.dict import print_dict

# import omni.isaac.orbit_tasks
from omni.isaac.orbit_tasks.utils import get_checkpoint_path, parse_env_cfg
from omni.isaac.orbit_tasks.utils.wrappers.rsl_rl import (
from omni.isaac.lab.devices import Se2Gamepad, Se2Keyboard
from omni.isaac.lab.envs import ViewerCfg
from omni.isaac.lab.envs.ui import ViewportCameraController

import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils import get_checkpoint_path, parse_env_cfg
from omni.isaac.lab_tasks.utils.wrappers.rsl_rl import (
RslRlOnPolicyRunnerCfg,
RslRlVecEnvWrapper,
export_policy_as_jit,
export_policy_as_onnx,
)
from rsl_rl.runners import OnPolicyRunner

# TASK = "Isaac-Velocity-Rough-Unitree-Go2-Play-v1"
# NBENVS = 1


def main():
"""Play with RSL-RL agent."""
# parse configuration
env_cfg = parse_env_cfg(
args_cli.task, use_gpu=not args_cli.cpu, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
agent_cfg: RslRlOnPolicyRunnerCfg = cli_args.parse_rsl_rl_cfg(args_cli.task, args_cli)

# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg)
# wrap around environment for rsl-rl
env = RslRlVecEnvWrapper(env)

# specify directory for logging experiments
log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name)
log_root_path = os.path.abspath(log_root_path)
print(f"[INFO] Loading experiment from directory: {log_root_path}")
resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint)
print(f"[INFO]: Loading model checkpoint from: {resume_path}")
log_dir = os.path.dirname(resume_path)

# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
# wrap for video recording
if args_cli.video:
video_kwargs = {
"video_folder": os.path.join(log_dir, "videos", "play"),
"step_trigger": lambda step: step == 0,
"video_length": args_cli.video_length,
"disable_logger": True,
}
print("[INFO] Recording videos during training.")
print_dict(video_kwargs, nesting=4)
env = gym.wrappers.RecordVideo(env, **video_kwargs)

# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)

# wrap around environment for rsl-rl
env = RslRlVecEnvWrapper(env)

print(f"[INFO]: Loading model checkpoint from: {resume_path}")
# load previously trained model
ppo_runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
ppo_runner.load(resume_path)
print(f"[INFO]: Loading model checkpoint from: {resume_path}")

# obtain the trained policy for inference
policy = ppo_runner.get_inference_policy(device=env.unwrapped.device)

# export policy to onnx
# export policy to onnx/jit
export_model_dir = os.path.join(os.path.dirname(resume_path), "exported")
export_policy_as_onnx(ppo_runner.alg.actor_critic, export_model_dir, filename="policy.onnx")

# setup device control (gamepad, keyboard)
export_policy_as_jit(
ppo_runner.alg.actor_critic, ppo_runner.obs_normalizer, path=export_model_dir, filename="policy.pt"
)
export_policy_as_onnx(
ppo_runner.alg.actor_critic, normalizer=ppo_runner.obs_normalizer, path=export_model_dir, filename="policy.onnx"
)

if args_cli.device == "gamepad":
teleop_interface = Se2Gamepad(
v_x_sensitivity=args_cli.lin_sensi,
v_y_sensitivity=args_cli.lin_sensi / 2,
omega_z_sensitivity=args_cli.rot_sensi,
dead_zone=0.05,
)
elif args_cli.device == "keyboard":

teleop_interface = Se2Gamepad(
v_x_sensitivity=args_cli.lin_sensi,
v_y_sensitivity=args_cli.lin_sensi / 2,
omega_z_sensitivity=args_cli.rot_sensi,
dead_zone=0.05,
)
# add teleoperation key for env reset
teleop_interface.add_callback(carb.input.GamepadInput.X, env.reset)

if args_cli.device == "keyboard":
teleop_interface = Se2Keyboard(
# v_x_sensitivity = args_cli.lin_sensi,
# v_y_sensitivity = args_cli.lin_sensi/2,
# omega_z_sensitivity = args_cli.rot_sensi,
)
# add teleoperation key for env reset
teleop_interface.add_callback("L", env.reset)


# print helper for keyboard

# reset environment
env.reset()
teleop_interface.reset()

# setup camera
Expand All @@ -128,19 +162,31 @@ def main():
env.unwrapped.command_manager._terms["base_velocity"].vel_command_b[0, :] = torch.tensor(
com, device=env.unwrapped.device
)
# reset environment
obs, _ = env.get_observations()
timestep = 0
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# agent stepping
actions = policy(obs)
# env stepping
com = teleop_interface.advance()
com[1] *= -1
com[2] *= -1
# print(com)
env.unwrapped.command_manager._terms["base_velocity"].vel_command_b[0, :] = torch.tensor(
com, device=env.unwrapped.device
)
print(env.unwrapped.command_manager._terms["base_velocity"].vel_command_b[0, :])

obs, _, _, _ = env.step(actions)
if args_cli.video:
timestep += 1
# Exit the play loop after recording one video
if timestep == args_cli.video_length:
break

# close the simulator
env.close()
Expand All @@ -151,3 +197,96 @@ def main():
main()
# close sim app
simulation_app.close()



# def main():
# """Play with RSL-RL agent."""
# # parse configuration
# env_cfg = parse_env_cfg(
# args_cli.task, use_gpu=not args_cli.cpu, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
# )
# agent_cfg: RslRlOnPolicyRunnerCfg = cli_args.parse_rsl_rl_cfg(args_cli.task, args_cli)

# # create isaac environment
# env = gym.make(args_cli.task, cfg=env_cfg)
# # wrap around environment for rsl-rl
# env = RslRlVecEnvWrapper(env)

# # specify directory for logging experiments
# log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name)
# log_root_path = os.path.abspath(log_root_path)
# print(f"[INFO] Loading experiment from directory: {log_root_path}")
# resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint)
# print(f"[INFO]: Loading model checkpoint from: {resume_path}")

# # load previously trained model
# ppo_runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
# ppo_runner.load(resume_path)
# print(f"[INFO]: Loading model checkpoint from: {resume_path}")

# # obtain the trained policy for inference
# policy = ppo_runner.get_inference_policy(device=env.unwrapped.device)

# # export policy to onnx
# export_model_dir = os.path.join(os.path.dirname(resume_path), "exported")
# export_policy_as_onnx(ppo_runner.alg.actor_critic, export_model_dir, filename="policy.onnx")

# # setup device control (gamepad, keyboard)

# if args_cli.device == "gamepad":
# teleop_interface = Se2Gamepad(
# v_x_sensitivity=args_cli.lin_sensi,
# v_y_sensitivity=args_cli.lin_sensi / 2,
# omega_z_sensitivity=args_cli.rot_sensi,
# dead_zone=0.05,
# )
# elif args_cli.device == "keyboard":
# teleop_interface = Se2Keyboard(
# # v_x_sensitivity = args_cli.lin_sensi,
# # v_y_sensitivity = args_cli.lin_sensi/2,
# # omega_z_sensitivity = args_cli.rot_sensi,
# )

# # print helper for keyboard
# print(teleop_interface)

# # reset environment
# env.reset()
# teleop_interface.reset()

# # setup camera
# cam_controller = ViewportCameraController(env.unwrapped, ViewerCfg())
# cam_controller.update_view_to_asset_root("robot")
# cam_controller.update_view_location([0, -4, 3], [0, 2, 0])

# # run environment
# com = teleop_interface.advance()
# com[1] *= -1
# com[2] *= -1
# env.unwrapped.command_manager._terms["base_velocity"].vel_command_b[0, :] = torch.tensor(
# com, device=env.unwrapped.device
# )
# obs, _ = env.get_observations()
# while simulation_app.is_running():
# with torch.inference_mode():
# # agent stepping
# actions = policy(obs)
# # env stepping
# com = teleop_interface.advance()
# com[1] *= -1
# com[2] *= -1
# env.unwrapped.command_manager._terms["base_velocity"].vel_command_b[0, :] = torch.tensor(
# com, device=env.unwrapped.device
# )
# obs, _, _, _ = env.step(actions)

# # close the simulator
# env.close()


# if __name__ == "__main__":
# # run the main function
# main()
# # close sim app
# simulation_app.close()

0 comments on commit 19285b3

Please sign in to comment.