目前是否推荐使用TensorFlow的estimator?

利: 官网可以看出,estimator已经是目前TensorFlow在推的东西了;代码架构清晰,底层的log、保存模型等基本函数已封装,使用较为方便;…
关注者
224
被浏览
102,712
登录后你可以
不限量看优质回答私信答主深度交流精彩内容一键收藏

很长一段时间使用的是tensorflow基带的estimator,后来发现,原estimator类封装耦合太多,有时候想调整模型结构就很麻烦,但是工程化的时候,依托于tensorflow的estimator,又是比较容易做分布式训练的。考虑到tensorflow其实自己也主持自定义estimator结构,所以把自定义模型结构的方式也记录下:

自定义模型的的任务定义:多任务学习任务,其中包含了三个目标:

  • 点击——二分类目标
  • 时长——回归目标
  • 互动——二分类目标

代码:一个比较完备的estimator的model_fn的定义

# encoding=utf-8
# @Time    : 2022/1/20 11:12 上午
# @Author  : Ahang
# @File    : custom_wd_mtl_estimator.py
# @Desc    : 自定义宽深多目标estimator模型

from tensorflow import losses
from tensorflow import metrics
from tensorflow.python.ops import control_flow_ops
import collections

# 多目标定义
tasks = {"is_click": "clf", "dwelltime": "reg", "interact": "clf"}
tasks = collections.OrderedDict(tasks)
task_names = list(tasks)
 
def model_fn(features, labels, mode, params):
    # step-1
    # 模型定义层
    with tf.variable_scope("linear"):
        linear_logits = tf.feature_column.linear_model(
                   features=features, feature_columns=wide_columns,units=len(tasks))
    with tf.variable_scope("dnn"):
        dnn_input = tf.feature_column.input_layer(features=features, feature_columns=deep_columns)  # 深
        dnn_hidden1 = tf.layers.Dense(units=1024, activation=tf.nn.relu, name="dnn_hidden1")(dnn_input)
        dnn_hidden2 = tf.layers.Dense(units=256, activation=tf.nn.relu, name="dnn_hidden2")(dnn_hidden1)
        dnn_hidden3 = tf.layers.Dense(units=64, name="dnn_hidden3")(dnn_hidden2)
        dnn_logits = tf.layers.Dense(units=len(tasks), name="dnn_logits", use_bias=True)(dnn_hidden3)
    logits = tf.add(linear_logits, dnn_logits, name="add_logits")
    preds = {}
    task_idx = 0
    
    for idx in range(len(task_names)):
        if tasks[task_names[idx]] == "clf":preds["{}/logistic".format(task_names[idx])] = tf.nn.sigmoid(logits[:, task_idx])
        if tasks[task_names[idx]] == "reg":preds["{}/predictions".format(task_names[idx])] = logits[:, task_idx]
        task_idx += 1
    for i in preds:
        preds[i] = tf.expand_dims(preds[i], 1)

    # step-2
    # ModeKeys.PREDICT
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = preds
        export_outputs = {"predict": tf.estimator.export.PredictOutput(outputs=predictions)}  # signature
        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)

    # step-3
    # DEFINE: losses:分类和回归使用不同的loss计算方式
    _losses = {}
    _loss = []
    for task_name in task_names:
        if tasks[task_name] == "clf":
            task_loss = tf.reduce_mean(losses.log_loss(
                         tf.expand_dims(tf.cast(labels[task_name], tf.float32), 1),
                         preds['{}/logistic'.format(task_name)],
                         reduction=tf.losses.Reduction.MEAN),name="loss/{}".format(task_name))
            _losses['loss/{}'.format(task_name)] = task_loss
            _loss.append(task_loss)
        if tasks[task_name] == "reg":
            task_loss = tf.reduce_mean(losses.mean_squared_error(
                         tf.expand_dims(tf.cast(labels[task_name], tf.float32), 1),
                         preds["{}/predictions".format(task_name)],
                         reduction=tf.losses.Reduction.MEAN),name="loss/{}".format(task_name))
            _losses['loss/{}'.format(task_name)] = task_loss
            _loss.append(task_loss)
    _loss = tf.add_n(_loss)
    # 对宽深部分使用不同的优化器
    train_ops = [
        tf.train.AdagradOptimizer(0.01).minimize(
                           loss=_loss, global_step=tf.train.get_global_step(),
                           var_list=tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "dnn")),
        tf.train.FtrlOptimizer(0.01).minimize(
                           loss=_loss, global_step=tf.train.get_global_step(),
                           var_list=tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "linear"))
    ]
 
    # step-4
    # ModeKeys.TRAIN
    if mode == tf.estimator.ModeKeys.TRAIN:
        _train_ops = control_flow_ops.group(train_ops)
        return tf.estimator.EstimatorSpec(mode, loss=_loss, train_op=_train_ops)
 
    # step-5
    # ModeKeys.EVAL
    if mode == tf.estimator.ModeKeys.EVAL:
        _metrics = {}
        for task_name in task_names:
            if tasks[task_name] == "clf":
                label = tf.expand_dims(tf.cast(labels[task_name], tf.int64), 1)
                pred = preds["{}/logistic".format(task_name)]
                _metrics['accuracy/{}'.format(task_name)] = metrics.accuracy(labels=label, predictions=pred)
                _metrics['auc/{}'.format(task_name)] = metrics.auc(labels=label, predictions=pred)
                _metrics['recall/{}'.format(task_name)] = metrics.recall(labels=label, predictions=pred)
                _metrics['precision/{}'.format(task_name)] = metrics.precision(labels=label, predictions=pred)
            if tasks[task_name] == "reg":
                label = tf.expand_dims(tf.cast(labels[task_name], tf.float32), 1)
                pred = preds["{}/predictions".format(task_name)]
                _metrics['loss/{}'.format(task_name)] = metrics.mean_squared_error(labels=label, predictions=pred)
            _metrics['prediction/mean/{}'.format(task_name)] = metrics.mean(pred)
        return tf.estimator.EstimatorSpec(mode, loss=_loss, eval_metric_ops=_metrics)

代码:示例化上面的model_fn

est = tf.estimator.Estimator(model_dir=model_dir,
                model_fn=model_fn,
                config=tf.estimator.RunConfig(save_checkpoints_steps=1000))


自定义模型的避坑指南-1 :建议自定义estimtor的model_fn的时候保证以下模块的顺序:

  • 模型层定义
  • ModeKeys.PREDICT
  • 定义损失函数和优化器
  • ModeKeys.TRAIN
  • ModeKeys.EVAL

尤其是损失函数部分,如果在ModeKeys.PREDICT之前的话,训练的时候容易报错,Ahang一开始的时候没注意,想把losses部分设置成全局变量,反而导致了走ModeKeys.PREDICT模块的时候,losses不可用,所以这个顺序很重要。

自定义模型的避坑指南-2 :tensorflow模块导入时,同名不同模块的方法需要注意

  • mean_squared_error,在losses类和metrics类里面都有,如果在一开始形如from ...metrics import mean_squared_error,后边很容易在定义loss的时候,也用了metrics下的mean_squared_error,容易触发一些问题,但却不会报错。