Skip to content

Commit

Permalink
Fix/shutdown lock (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaihui authored Jan 23, 2024
1 parent 70d3cc8 commit 740807b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
4 changes: 3 additions & 1 deletion fed/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ def shutdown():
"""
Shutdown a RayFed client.
"""
_shutdown(True)
global_context = get_global_context()
if global_context is not None and global_context.acquire_shutdown_flag():
_shutdown(True)


def _shutdown(intended=True):
Expand Down
47 changes: 47 additions & 0 deletions fed/tests/test_cross_silo_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,52 @@ def test_cross_silo_not_expose_error_trace():
assert p_bob.exitcode == 0


@fed.remote
def foo(e):
print(e)


def run4(party):
compatible_utils.init_ray(address='local')
addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
}

fed.init(
addresses=addresses,
party=party,
logging_level='debug',
config={
'cross_silo_comm': {
'timeout_ms': 20 * 1000,
'expose_error_trace': False,
},
},
)

a = error_func.party("alice").remote()
o = foo.party('bob').remote(a)
if party == 'bob':
# Wait a while to receive error from alice.
import time

time.sleep(10)
# Alice will shutdown once exactly.
fed.shutdown()
ray.shutdown()


def test_cross_silo_alice_send_error_and_shutdown_once():
p_alice = multiprocessing.Process(target=run4, args=('alice',))
p_bob = multiprocessing.Process(target=run4, args=('bob',))
p_alice.start()
p_bob.start()
p_alice.join()
p_bob.join()
assert p_alice.exitcode == 0
assert p_bob.exitcode == 0


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

0 comments on commit 740807b

Please sign in to comment.