Skip to content

Commit

Permalink
Support sending embedding info. after training is completed (#1101)
Browse files Browse the repository at this point in the history
* support refluxing embedding info. after training is commpleted:
1. add send_no_deps/recv_no_deps in FLModel
2. update wide_n_deep example to demo. how to use new interface to reflux embedding info. from leader to follower.

* clean an obsolete import in example
  • Loading branch information
volchyt2024 authored Jul 5, 2024
1 parent 436e495 commit 00abc79
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 4 deletions.
50 changes: 48 additions & 2 deletions example/wide_n_deep/follower.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import os
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt


ROLE = 'follower'
ENV = os.environ
DEBUG_PRINT = int(ENV.get('DEBUG_PRINT', 0))

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=32,
Expand Down Expand Up @@ -50,9 +53,31 @@ def serving_input_receiver_fn():

record_batch = tf.placeholder(dtype=tf.string, name='examples')
features = tf.parse_example(record_batch, features=feature_map)
receiver_tensors = {
'examples': record_batch
}
return tf.estimator.export.ServingInputReceiver(
features, {'examples': record_batch})
features, receiver_tensors)

def final_fn(model, tensor_name, is_send, assignee, tensor=None, shape=None):
ops = []
if is_send:
assert tensor, "Please specify tensor to send"
ops.append(model.send_no_deps(tensor_name, tensor))
return ops

receiver_op = model.recv_no_deps(tensor_name, shape=shape)
ops.append(receiver_op)

with tf.control_dependencies([receiver_op]):
if DEBUG_PRINT:
ops.append(tf.print('received embedding info: ', receiver_op))
index = 0
while index < len(assignee):
ops.append(tf.assign(assignee[index], receiver_op[index]))
index += 1

return ops

def model_fn(model, features, labels, mode):
global_step = tf.train.get_or_create_global_step()
Expand All @@ -61,6 +86,13 @@ def model_fn(model, features, labels, mode):

num_slot = 512
fid_size, embed_size = 101, 16
peer_embeddings = [
tf.get_variable(
'peer_slot_emb{0}'.format(i), shape=[fid_size, embed_size],
dtype=tf.float32,
initializer=tf.zeros_initializer())
for i in range(num_slot)]

embeddings = [
tf.get_variable('slot_emb{0}'.format(i),
shape=[fid_size, embed_size], dtype=tf.float32,
Expand Down Expand Up @@ -89,7 +121,11 @@ def model_fn(model, features, labels, mode):
optimizer = tf.train.GradientDescentOptimizer(0.1)
train_op = model.minimize(
optimizer, act1_f, grad_loss=gact1_f, global_step=global_step)
final_ops = final_fn(model=model, tensor_name='reflux_embedding',
is_send=False, assignee=peer_embeddings, shape=[num_slot,fid_size,embed_size])
embedding_hook = tf.train.FinalOpsHook(final_ops=final_ops)
return model.make_spec(mode, loss=tf.math.reduce_mean(act1_f),
training_chief_hooks=[embedding_hook],
train_op=train_op)

if mode == tf.estimator.ModeKeys.EVAL:
Expand All @@ -100,8 +136,18 @@ def model_fn(model, features, labels, mode):
# mode == tf.estimator.ModeKeys.PREDICT:
return model.make_spec(mode, predictions={'act1_f': act1_f})

class ExportModelHook(flt.trainer_worker.ExportModelHook):
def after_save(self, sess, model, export_dir, inputs, outputs):
print("**************export model hook**************")
print("sess :", sess)
print("model: ", model)
print("export_dir: ", export_dir)
print("inputs: ", inputs)
print("outpus: ", outputs)
print("*********************************************")

if __name__ == '__main__':
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn)
model_fn, serving_input_receiver_fn,
export_model_hook=ExportModelHook())
27 changes: 27 additions & 0 deletions example/wide_n_deep/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import os
import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'leader'
ENV = os.environ
DEBUG_PRINT = int(ENV.get('DEBUG_PRINT', 0))

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=32,
Expand Down Expand Up @@ -60,6 +63,20 @@ def serving_input_receiver_fn():
features, receiver_tensors)


def final_fn(model, tensor_name, is_send, tensor=None, shape=None):
ops = []

if is_send:
assert tensor, "Please specify tensor to send"
if DEBUG_PRINT:
ops.append(tf.print(tensor))
ops.append(model.send_no_deps(tensor_name, tensor))
return ops

ops.append(model.recv_no_deps(tensor_name, shape=shape))
return ops


def model_fn(model, features, labels, mode):
"""Model Builder of wide&deep learning models
Args:
Expand All @@ -75,12 +92,17 @@ def model_fn(model, features, labels, mode):

num_slot = 512
fid_size, embed_size = 101, 16

num_slot = 512
fid_size, embed_size = 101, 16

embeddings = [
tf.get_variable(
'slot_emb{0}'.format(i), shape=[fid_size, embed_size],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
for i in range(num_slot)]

embed_output = tf.concat(
[
tf.nn.embedding_lookup_sparse(
Expand Down Expand Up @@ -139,9 +161,14 @@ def model_fn(model, features, labels, mode):
{"loss" : loss}, every_n_iter=10)
metric_hook = flt.GlobalStepMetricTensorHook(tensor_dict={"loss": loss},
every_steps=10)
final_ops = final_fn(model=model, tensor_name='reflux_embedding',is_send=True,tensor=embeddings)
embedding_hook = tf.train.FinalOpsHook(final_ops=final_ops)

optimizer = tf.train.GradientDescentOptimizer(0.1)

train_op = model.minimize(optimizer, loss, global_step=global_step)
return model.make_spec(mode, loss=loss, train_op=train_op,
training_chief_hooks=[embedding_hook],
training_hooks=[logging_hook, metric_hook])

class ExportModelHook(flt.trainer_worker.ExportModelHook):
Expand Down
18 changes: 18 additions & 0 deletions fedlearner/trainer/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ def recv(self, name, dtype=tf.float32, require_grad=False, shape=None):
self._recvs.append((name, receive_op, require_grad))
return receive_op

def send_no_deps(self, name, tensor):
send_op = self._bridge.send_op(name, tensor)
self._sends.append((name, tensor, False))
return send_op

def recv_no_deps(self, name, dtype=tf.float32, require_grad=False, shape=None):
receive_op = self._bridge.receive_op(name, dtype)
if shape:
receive_op = tf.ensure_shape(receive_op, shape)
else:
fl_logging.warning(
'Receiving tensor %s without checking shape. '
'Consider setting shape at model.recv(shape=(...)). '
'shape can have None dimensions '
'which matches to any length.', name)
self._recvs.append((name, receive_op, require_grad))
return receive_op

def minimize(self,
optimizer,
loss,
Expand Down
2 changes: 1 addition & 1 deletion fedlearner/trainer/run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,4 @@ def _parse_op_label(self, label):
inputs = []
else:
inputs = inputs.split(', ')
return nn, op, inputs
return nn, op, inputs
3 changes: 2 additions & 1 deletion fedlearner/trainer/trainer_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ def _run_local(role,
trainer_master,
bridge,
role,
model_fn)
model_fn,
is_chief=True)

if mode == 'train':
estimator.train(input_fn)
Expand Down

0 comments on commit 00abc79

Please sign in to comment.