From 00abc79e759a302488cf3b06bb6e3c85066eab42 Mon Sep 17 00:00:00 2001 From: volchyt2024 Date: Fri, 5 Jul 2024 14:32:00 +0800 Subject: [PATCH] Support sending embedding info. after training is completed (#1101) * support refluxing embedding info. after training is commpleted: 1. add send_no_deps/recv_no_deps in FLModel 2. update wide_n_deep example to demo. how to use new interface to reflux embedding info. from leader to follower. * clean an obsolete import in example --- example/wide_n_deep/follower.py | 50 ++++++++++++++++++++++++++-- example/wide_n_deep/leader.py | 27 +++++++++++++++ fedlearner/trainer/estimator.py | 18 ++++++++++ fedlearner/trainer/run_hooks.py | 2 +- fedlearner/trainer/trainer_worker.py | 3 +- 5 files changed, 96 insertions(+), 4 deletions(-) diff --git a/example/wide_n_deep/follower.py b/example/wide_n_deep/follower.py index 8b987dceb..ab0591c1b 100644 --- a/example/wide_n_deep/follower.py +++ b/example/wide_n_deep/follower.py @@ -15,11 +15,14 @@ # coding: utf-8 # pylint: disable=no-else-return, inconsistent-return-statements +import os import tensorflow.compat.v1 as tf import fedlearner.trainer as flt ROLE = 'follower' +ENV = os.environ +DEBUG_PRINT = int(ENV.get('DEBUG_PRINT', 0)) parser = flt.trainer_worker.create_argument_parser() parser.add_argument('--batch-size', type=int, default=32, @@ -50,9 +53,31 @@ def serving_input_receiver_fn(): record_batch = tf.placeholder(dtype=tf.string, name='examples') features = tf.parse_example(record_batch, features=feature_map) + receiver_tensors = { + 'examples': record_batch + } return tf.estimator.export.ServingInputReceiver( - features, {'examples': record_batch}) + features, receiver_tensors) +def final_fn(model, tensor_name, is_send, assignee, tensor=None, shape=None): + ops = [] + if is_send: + assert tensor, "Please specify tensor to send" + ops.append(model.send_no_deps(tensor_name, tensor)) + return ops + + receiver_op = model.recv_no_deps(tensor_name, shape=shape) + ops.append(receiver_op) + + with tf.control_dependencies([receiver_op]): + if DEBUG_PRINT: + ops.append(tf.print('received embedding info: ', receiver_op)) + index = 0 + while index < len(assignee): + ops.append(tf.assign(assignee[index], receiver_op[index])) + index += 1 + + return ops def model_fn(model, features, labels, mode): global_step = tf.train.get_or_create_global_step() @@ -61,6 +86,13 @@ def model_fn(model, features, labels, mode): num_slot = 512 fid_size, embed_size = 101, 16 + peer_embeddings = [ + tf.get_variable( + 'peer_slot_emb{0}'.format(i), shape=[fid_size, embed_size], + dtype=tf.float32, + initializer=tf.zeros_initializer()) + for i in range(num_slot)] + embeddings = [ tf.get_variable('slot_emb{0}'.format(i), shape=[fid_size, embed_size], dtype=tf.float32, @@ -89,7 +121,11 @@ def model_fn(model, features, labels, mode): optimizer = tf.train.GradientDescentOptimizer(0.1) train_op = model.minimize( optimizer, act1_f, grad_loss=gact1_f, global_step=global_step) + final_ops = final_fn(model=model, tensor_name='reflux_embedding', + is_send=False, assignee=peer_embeddings, shape=[num_slot,fid_size,embed_size]) + embedding_hook = tf.train.FinalOpsHook(final_ops=final_ops) return model.make_spec(mode, loss=tf.math.reduce_mean(act1_f), + training_chief_hooks=[embedding_hook], train_op=train_op) if mode == tf.estimator.ModeKeys.EVAL: @@ -100,8 +136,18 @@ def model_fn(model, features, labels, mode): # mode == tf.estimator.ModeKeys.PREDICT: return model.make_spec(mode, predictions={'act1_f': act1_f}) +class ExportModelHook(flt.trainer_worker.ExportModelHook): + def after_save(self, sess, model, export_dir, inputs, outputs): + print("**************export model hook**************") + print("sess :", sess) + print("model: ", model) + print("export_dir: ", export_dir) + print("inputs: ", inputs) + print("outpus: ", outputs) + print("*********************************************") if __name__ == '__main__': flt.trainer_worker.train( ROLE, args, input_fn, - model_fn, serving_input_receiver_fn) + model_fn, serving_input_receiver_fn, + export_model_hook=ExportModelHook()) diff --git a/example/wide_n_deep/leader.py b/example/wide_n_deep/leader.py index eaf3236b7..781f2ebce 100644 --- a/example/wide_n_deep/leader.py +++ b/example/wide_n_deep/leader.py @@ -15,10 +15,13 @@ # coding: utf-8 # pylint: disable=no-else-return, inconsistent-return-statements +import os import tensorflow.compat.v1 as tf import fedlearner.trainer as flt ROLE = 'leader' +ENV = os.environ +DEBUG_PRINT = int(ENV.get('DEBUG_PRINT', 0)) parser = flt.trainer_worker.create_argument_parser() parser.add_argument('--batch-size', type=int, default=32, @@ -60,6 +63,20 @@ def serving_input_receiver_fn(): features, receiver_tensors) +def final_fn(model, tensor_name, is_send, tensor=None, shape=None): + ops = [] + + if is_send: + assert tensor, "Please specify tensor to send" + if DEBUG_PRINT: + ops.append(tf.print(tensor)) + ops.append(model.send_no_deps(tensor_name, tensor)) + return ops + + ops.append(model.recv_no_deps(tensor_name, shape=shape)) + return ops + + def model_fn(model, features, labels, mode): """Model Builder of wide&deep learning models Args: @@ -75,12 +92,17 @@ def model_fn(model, features, labels, mode): num_slot = 512 fid_size, embed_size = 101, 16 + + num_slot = 512 + fid_size, embed_size = 101, 16 + embeddings = [ tf.get_variable( 'slot_emb{0}'.format(i), shape=[fid_size, embed_size], dtype=tf.float32, initializer=tf.random_uniform_initializer(-0.01, 0.01)) for i in range(num_slot)] + embed_output = tf.concat( [ tf.nn.embedding_lookup_sparse( @@ -139,9 +161,14 @@ def model_fn(model, features, labels, mode): {"loss" : loss}, every_n_iter=10) metric_hook = flt.GlobalStepMetricTensorHook(tensor_dict={"loss": loss}, every_steps=10) + final_ops = final_fn(model=model, tensor_name='reflux_embedding',is_send=True,tensor=embeddings) + embedding_hook = tf.train.FinalOpsHook(final_ops=final_ops) + optimizer = tf.train.GradientDescentOptimizer(0.1) + train_op = model.minimize(optimizer, loss, global_step=global_step) return model.make_spec(mode, loss=loss, train_op=train_op, + training_chief_hooks=[embedding_hook], training_hooks=[logging_hook, metric_hook]) class ExportModelHook(flt.trainer_worker.ExportModelHook): diff --git a/fedlearner/trainer/estimator.py b/fedlearner/trainer/estimator.py index b7e5c2ac9..360be6848 100644 --- a/fedlearner/trainer/estimator.py +++ b/fedlearner/trainer/estimator.py @@ -82,6 +82,24 @@ def recv(self, name, dtype=tf.float32, require_grad=False, shape=None): self._recvs.append((name, receive_op, require_grad)) return receive_op + def send_no_deps(self, name, tensor): + send_op = self._bridge.send_op(name, tensor) + self._sends.append((name, tensor, False)) + return send_op + + def recv_no_deps(self, name, dtype=tf.float32, require_grad=False, shape=None): + receive_op = self._bridge.receive_op(name, dtype) + if shape: + receive_op = tf.ensure_shape(receive_op, shape) + else: + fl_logging.warning( + 'Receiving tensor %s without checking shape. ' + 'Consider setting shape at model.recv(shape=(...)). ' + 'shape can have None dimensions ' + 'which matches to any length.', name) + self._recvs.append((name, receive_op, require_grad)) + return receive_op + def minimize(self, optimizer, loss, diff --git a/fedlearner/trainer/run_hooks.py b/fedlearner/trainer/run_hooks.py index df674c6e0..ec8131b3d 100644 --- a/fedlearner/trainer/run_hooks.py +++ b/fedlearner/trainer/run_hooks.py @@ -243,4 +243,4 @@ def _parse_op_label(self, label): inputs = [] else: inputs = inputs.split(', ') - return nn, op, inputs + return nn, op, inputs \ No newline at end of file diff --git a/fedlearner/trainer/trainer_worker.py b/fedlearner/trainer/trainer_worker.py index 0bf420891..842a87915 100644 --- a/fedlearner/trainer/trainer_worker.py +++ b/fedlearner/trainer/trainer_worker.py @@ -341,7 +341,8 @@ def _run_local(role, trainer_master, bridge, role, - model_fn) + model_fn, + is_chief=True) if mode == 'train': estimator.train(input_fn)