-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Multi-Job][2/N] Make proxy's RPC framework replaceable #140
Conversation
Signed-off-by: NKcqx <[email protected]>
Signed-off-by: NKcqx <[email protected]>
Signed-off-by: NKcqx <[email protected]>
Could you at me when this is ready for review again @NKcqx |
will be used. More details please refer to | ||
`retry-policy <https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy>`_. # noqa | ||
|
||
enable_waiting_for_other_parties_ready: ping other parties until they |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not match the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite understand.
If you're saying the functionality, I think it doesn't belong to the scope of this PR.
Signed-off-by: paer <[email protected]>
Signed-off-by: paer <[email protected]>
fed/proxy/grpc_proxy.py
Outdated
server.add_secure_port(f'[::]:{port}', server_credentials) | ||
else: | ||
server.add_insecure_port(f'[::]:{port}') | ||
# server.add_insecure_port(f'[::]:{port}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this # server.add_insecure_port(f'[::]:{port}')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. General LGTM.
The UT failure is not related to this PR, has filed an issue #153 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great PR.
I left some high level comments, and I need more time to review to deep into dive.
fed/_private/constants.py
Outdated
KEY_OF_CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES = "CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES" # noqa | ||
|
||
KEY_OF_CROSS_SILO_TIMEOUT_IN_SECONDS = "CROSS_SILO_TIMEOUT_IN_SECONDS" | ||
KEY_OF_CROSS_SILO_COMM_CONFIG = "CROSS_SILO_COMM_CONFIG" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KEY_OF_CROSS_SILO_COMM_CONFIG = "CROSS_SILO_COMM_CONFIG" | |
KEY_OF_CROSS_SILO_COMMON_CONFIG = "CROSS_SILO_COMMON_CONFIG" |
fed/_private/serialization_utils.py
Outdated
@@ -63,7 +63,8 @@ def find_class(self, module, name): | |||
def _apply_loads_function_with_whitelist(): | |||
global _pickle_whitelist | |||
|
|||
_pickle_whitelist = fed_config.get_cluster_config().serializing_allowed_list | |||
_pickle_whitelist = fed_config.get_job_config() \ | |||
.cross_silo_comm_config.serializing_allowed_list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be job config?
@zhouaihui @fengsp CC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we separate ray.init
from fed.init
, there's no way to reach the cluster-level information, since each fed.init
starts and only starts a job session.
Unless there's a global actor (or service job) that can break the job isolation and filter each job's tasks' invalid param type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have the cluster config, why we not just use config
? The question that we should answer before it getting finalized is whether we need the cluster config in the future at high level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss in #156
fed/api.py
Outdated
) | ||
from fed.config import CrossSiloCommConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the more native name is CommonCrossSiloConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Comm" is short for "Communication" 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅 So that we should even more not use it as it's confusing though in this conversation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to CrossSiloMsgConfig
and GrpcCrossSiloMsgConfig
, any other name suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
proposing CrossSiloMessageConfig
resource_label=cross_silo_recv_resource_label) | ||
if recv_proxy_cls is None: | ||
from fed.proxy.grpc_proxy import GrpcRecvProxy | ||
recv_proxy_cls = GrpcRecvProxy | ||
# Start recv proxy | ||
start_recv_proxy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not related to this PR changes, but start_recv_proxy
function should be a private one.
BTW, could you please update the PR title and the description? They're not match to your PR content. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments.
This PR looks good to me overall. I think it's time to merge it and then the remaining issues could be addressed in the next coming PRs:
- Making it easy to support plugin a new communication component. For instance, I don't think
grpc_config
should be defined infed/config
as it's hard to plugin. - Removing cluster config concept, and use config as the name for job level config.
- Defining how we could pass the cluster config enabling for all of the jobs.
KEY_OF_CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES = "CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES" # noqa | ||
|
||
KEY_OF_CROSS_SILO_TIMEOUT_IN_SECONDS = "CROSS_SILO_TIMEOUT_IN_SECONDS" | ||
KEY_OF_CROSS_SILO_MSG_CONFIG = "CROSS_SILO_MSG_CONFIG" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KEY_OF_CROSS_SILO_MSG_CONFIG = "CROSS_SILO_MSG_CONFIG" | |
KEY_OF_CROSS_SILO_MESSAGE_CONFIG = "CROSS_SILO_MESSAGE_CONFIG" |
@@ -63,7 +63,8 @@ def find_class(self, module, name): | |||
def _apply_loads_function_with_whitelist(): | |||
global _pickle_whitelist | |||
|
|||
_pickle_whitelist = fed_config.get_cluster_config().serializing_allowed_list | |||
_pickle_whitelist = fed_config.get_job_config() \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we should rename the function to get_config()
. But we could defer it later.
@@ -63,7 +63,8 @@ def find_class(self, module, name): | |||
def _apply_loads_function_with_whitelist(): | |||
global _pickle_whitelist | |||
|
|||
_pickle_whitelist = fed_config.get_cluster_config().serializing_allowed_list | |||
_pickle_whitelist = fed_config.get_job_config() \ | |||
.cross_silo_msg_config.serializing_allowed_list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.cross_silo_msg_config.serializing_allowed_list | |
.cross_silo_message_config.serializing_allowed_list |
@@ -34,6 +34,8 @@ | |||
start_recv_proxy, | |||
start_send_proxy, | |||
) | |||
from fed.proxy.grpc.grpc_proxy import SendProxy, RecvProxy | |||
from fed.config import CrossSiloMsgConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from fed.config import CrossSiloMsgConfig | |
from fed.config import CrossSiloMessageConfig |
enable_waiting_for_other_parties_ready: ping other parties until they | ||
are all ready if True. | ||
grpc_metadata: optional; The metadata sent with the grpc request. This won't override | ||
basic tcp headers, such as `user-agent`, but aggregate them together. | ||
global_cross_silo_msg_config: Global cross-silo message related |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for that I still don't get what the global
mean here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the cluster
parameter in fed.init
supports setting msg config for each party.
The "global" here indicates that the config applies to each party, and can be overridden by the party-specific config declared in cluster
.
resource_label=cross_silo_recv_resource_label) | ||
if recv_proxy_cls is None: | ||
logger.debug( | ||
"Not declaring recver proxy class, using `GrpcRecvProxy` as default.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no receiver proxy class specified, it uses `GrpcRecvProxy` by default.
more native?
@@ -48,8 +39,10 @@ def __init__(self, raw_bytes: bytes) -> None: | |||
self._data = cloudpickle.loads(raw_bytes) | |||
|
|||
@property | |||
def grpc_metadata(self): | |||
return self._data.get(fed_constants.KEY_OF_GRPC_METADATA, {}) | |||
def cross_silo_msg_config(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def cross_silo_msg_config(self): | |
def cross_silo_message_config(self): |
@@ -80,14 +73,94 @@ def get_job_config(): | |||
return _job_config | |||
|
|||
|
|||
class ProxyActorConfig: | |||
@dataclass | |||
class CrossSiloMsgConfig: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class CrossSiloMsgConfig: | |
class CrossSiloMessageConfig: |
from fed.config import CrossSiloMsgConfig | ||
|
||
|
||
class SendProxy(abc.ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename these to SenderProxy
and ReceiverProxy
?
This is the following up #140 , to address the naming issues. And it's also the part of #44 --------- Signed-off-by: Qing Wang <[email protected]>
[2/N] in #134
fed.init
API to reduce tha amount of RPC related config parameters.