-
Notifications
You must be signed in to change notification settings - Fork 32
/
__init__.py
231 lines (212 loc) · 7.65 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import json
try:
from collections.abc import Iterable
except ImportError:
Iterable = (tuple, list)
def make_async(
id,
num_envs=1,
asynchronous=True,
wrappers=None,
render=False,
obs_dim=23,
action_dim=7,
env_type=None,
max_episode_steps=None,
# below for furniture only
gpu_id=0,
headless=True,
record=False,
normalization_path=None,
furniture="one_leg",
randomness="low",
obs_steps=1,
act_steps=8,
sparse_reward=False,
# below for robomimic only
robomimic_env_cfg_path=None,
use_image_obs=False,
render_offscreen=False,
reward_shaping=False,
shape_meta=None,
**kwargs,
):
"""Create a vectorized environment from multiple copies of an environment,
from its id.
Parameters
----------
id : str
The environment ID. This must be a valid ID from the registry.
num_envs : int
Number of copies of the environment.
asynchronous : bool
If `True`, wraps the environments in an :class:`AsyncVectorEnv` (which uses
`multiprocessing`_ to run the environments in parallel). If ``False``,
wraps the environments in a :class:`SyncVectorEnv`.
wrappers : dictionary, optional
Each key is a wrapper class, and each value is a dictionary of arguments
Returns
-------
:class:`gym.vector.VectorEnv`
The vectorized environment.
Example
-------
>>> env = gym.vector.make('CartPole-v1', num_envs=3)
>>> env.reset()
array([[-0.04456399, 0.04653909, 0.01326909, -0.02099827],
[ 0.03073904, 0.00145001, -0.03088818, -0.03131252],
[ 0.03468829, 0.01500225, 0.01230312, 0.01825218]],
dtype=float32)
"""
if env_type == "furniture":
from furniture_bench.envs.observation import DEFAULT_STATE_OBS
from furniture_bench.envs.furniture_rl_sim_env import FurnitureRLSimEnv
from env.gym_utils.wrapper.furniture import FurnitureRLSimEnvMultiStepWrapper
env = FurnitureRLSimEnv(
act_rot_repr="rot_6d",
action_type="pos",
april_tags=False,
concat_robot_state=True,
ctrl_mode="diffik",
obs_keys=DEFAULT_STATE_OBS,
furniture=furniture,
gpu_id=gpu_id,
headless=headless,
num_envs=num_envs,
observation_space="state",
randomness=randomness,
max_env_steps=max_episode_steps,
record=record,
pos_scalar=1,
rot_scalar=1,
stiffness=1_000,
damping=200,
)
env = FurnitureRLSimEnvMultiStepWrapper(
env,
n_obs_steps=obs_steps,
n_action_steps=act_steps,
prev_action=False,
reset_within_step=False,
pass_full_observations=False,
normalization_path=normalization_path,
sparse_reward=sparse_reward,
)
return env
# avoid import error due incompatible gym versions
from gym import spaces
from env.gym_utils.async_vector_env import AsyncVectorEnv
from env.gym_utils.sync_vector_env import SyncVectorEnv
from env.gym_utils.wrapper import wrapper_dict
__all__ = [
"AsyncVectorEnv",
"SyncVectorEnv",
"VectorEnv",
"VectorEnvWrapper",
"make",
]
# import the envs
if robomimic_env_cfg_path is not None:
import robomimic.utils.env_utils as EnvUtils
import robomimic.utils.obs_utils as ObsUtils
elif "avoiding" in id:
import gym_avoiding
else:
import d4rl.gym_mujoco
from gym.envs import make as make_
def _make_env():
if robomimic_env_cfg_path is not None:
obs_modality_dict = {
"low_dim": (
wrappers.robomimic_image.low_dim_keys
if "robomimic_image" in wrappers
else wrappers.robomimic_lowdim.low_dim_keys
),
"rgb": (
wrappers.robomimic_image.image_keys
if "robomimic_image" in wrappers
else None
),
}
if obs_modality_dict["rgb"] is None:
obs_modality_dict.pop("rgb")
ObsUtils.initialize_obs_modality_mapping_from_dict(obs_modality_dict)
if render_offscreen or use_image_obs:
os.environ["MUJOCO_GL"] = "egl"
with open(robomimic_env_cfg_path, "r") as f:
env_meta = json.load(f)
env_meta["reward_shaping"] = reward_shaping
env = EnvUtils.create_env_from_metadata(
env_meta=env_meta,
render=render,
# only way to not show collision geometry is to enable render_offscreen, which uses a lot of RAM.
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
# render_gpu_device_id=0,
)
# Robosuite's hard reset causes excessive memory consumption.
# Disabled to run more envs.
# https://github.com/ARISE-Initiative/robosuite/blob/92abf5595eddb3a845cd1093703e5a3ccd01e77e/robosuite/environments/base.py#L247-L248
env.env.hard_reset = False
else: # d3il, gym
if "kitchen" not in id: # d4rl kitchen does not support rendering!
kwargs["render"] = render
env = make_(id, **kwargs)
# add wrappers
if wrappers is not None:
for wrapper, args in wrappers.items():
env = wrapper_dict[wrapper](env, **args)
return env
def dummy_env_fn():
"""TODO(allenzren): does this dummy env allow camera obs for other envs besides robomimic?"""
import gym
import numpy as np
from env.gym_utils.wrapper.multi_step import MultiStep
# Avoid importing or using env in the main process
# to prevent OpenGL context issue with fork.
# Create a fake env whose sole purpose is to provide
# obs/action spaces and metadata.
env = gym.Env()
observation_space = spaces.Dict()
if shape_meta is not None: # rn only for images
for key, value in shape_meta["obs"].items():
shape = value["shape"]
if key.endswith("rgb"):
min_value, max_value = -1, 1
elif key.endswith("state"):
min_value, max_value = -1, 1
else:
raise RuntimeError(f"Unsupported type {key}")
observation_space[key] = spaces.Box(
low=min_value,
high=max_value,
shape=shape,
dtype=np.float32,
)
else:
observation_space["state"] = gym.spaces.Box(
-1,
1,
shape=(obs_dim,),
dtype=np.float32,
)
env.observation_space = observation_space
env.action_space = gym.spaces.Box(-1, 1, shape=(action_dim,), dtype=np.int64)
env.metadata = {
"render.modes": ["human", "rgb_array", "depth_array"],
"video.frames_per_second": 12,
}
return MultiStep(env=env, n_obs_steps=wrappers.multi_step.n_obs_steps)
env_fns = [_make_env for _ in range(num_envs)]
return (
AsyncVectorEnv(
env_fns,
dummy_env_fn=(
dummy_env_fn if render or render_offscreen or use_image_obs else None
),
delay_init="avoiding" in id, # add delay for D3IL initialization
)
if asynchronous
else SyncVectorEnv(env_fns)
)