-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Multi-Job][3/N] Distinguish cross_silo msg with the job name #172
Conversation
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
fed/_private/compatible_utils.py
Outdated
def wrap_kv_key(job_name, key): | ||
"""Add an prefix to the key to avoid conflict with other jobs. | ||
""" | ||
if (isinstance(key, bytes)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us just restrict the type to str
only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can't restrict the KV get
and put
to only use the str
type key.
Do you mean changing all of the current KV accesses to use str
type only ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant:
def wrap_kv_key(job_name, key):
"""Add an prefix to the key to avoid conflict with other jobs.
"""
assert isinstance(key, str)
@@ -48,23 +48,25 @@ def cross_silo_comm_config_dict(self) -> Dict: | |||
_job_config = None | |||
|
|||
|
|||
def get_cluster_config(): | |||
def get_cluster_config(job_name: str = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum, so this method name should be changed to get_job_config or get_config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both "cluster_config" and "job_config" are part of the JOB, see #156. So I think it's fine to retrieve a "cluster config" by "job name" though it's definitely hard to understand 🤣
I think in the near future, we can merge these two configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Then let us leave a TODO comment on that target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
fed/proxy/grpc/grpc_proxy.py
Outdated
|
||
async def SendData(self, request, context): | ||
job_name = request.job_name | ||
if job_name != self._job_name: | ||
return fed_pb2.SendDataResponse(result="ERROR") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error messag should be more detailed?
Signed-off-by: NKcqx <[email protected]>
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
return fed_pb2.SendDataResponse( | ||
result=f"JobName mis-match, expected {self._job_name}, got {job_name}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The none "OK" response indicates that
- the message is sent successfully
- there're errors in receiver party, i.e. it's a cross-silo error.
Therefore, the following process should belong to the cross-silo error handle mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -46,7 +46,10 @@ def __init__( | |||
submit_ray_task_func, | |||
options={}, | |||
) -> None: | |||
self._party = fed_config.get_cluster_config().current_party | |||
# Note(NKcqx): FedCallHolder will only be created in driver process, where | |||
# the GlobalContext must has been initialized. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good note.
Multi-job scenario may have case that job 1 send messages to the proxy actor that belongs to job2.
For example, there're two jobs [Job 1] and [Job 2] that both involve party 'alice' and 'bob', a possible scenario is:
Therefore, the cross-silo message need a job_name to distinguish. In this case, the step 4 will ignore the message.