TensorFlow优化器浅析
阅读原文时间:2023年07月08日阅读:2

本文基于tensorflow-v1.15分支,简单分析下TensorFlow中的优化器。

optimizer = tf.train.GradientDescentOptimizer(learning_rate=.05)
train_op = optimizer.minimize(loss)

当我们调用optimizer.minimize()时,其内部会调用两个方法compute_gradients()apply_gradients(),分别用来计算梯度和使用梯度更新权重,其核心逻辑如下所示。

def minimize(self, loss, global_step=None, var_list=None,
             gate_gradients=GATE_OP, aggregation_method=None,
             colocate_gradients_with_ops=False, name=None,
             grad_loss=None):
    grads_and_vars = self.compute_gradients(
        loss, var_list=var_list, gate_gradients=gate_gradients,
        aggregation_method=aggregation_method,
        colocate_gradients_with_ops=colocate_gradients_with_ops,
        grad_loss=grad_loss)

    vars_with_grad = [v for g, v in grads_and_vars if g is not None]
    return self.apply_gradients(grads_and_vars, global_step=global_step, name=name)

如果我们想在模型更新前对梯度搞一些自定义的操作,TensorFlow中推荐的方式是

  1. 通过compute_gradients计算梯度
  2. 对梯度进行一些自定义操作
  3. 通过apply_gradients将处理后的梯度更新到模型权重

optimizer.minimize的第一阶段,我们首先通过compute_gradients计算出梯度。在compute_gradients函数中,TensorFlow使用了两种计算梯度的方式,分别是针对静态图的tf.gradients接口和针对动态图的tf.GradientTape接口,这两个接口内部分别使用了符号微分和自动微分的方式来计算梯度。下面是compute_gradients的核心执行逻辑,代码中省略了部分异常判断的语句。可以看到,如果传入的loss是一个可调用对象,那么就会调用backprop.GradientTape相关的接口去求解梯度;否则,就会调用gradients.gradients接口去求解梯度。

from tensorflow.python.eager import backprop
from tensorflow.python.ops import gradients
def compute_gradients(self, loss, var_list=None,
                      gate_gradients=GATE_OP,
                      aggregation_method=None,
                      colocate_gradients_with_ops=False,
                      grad_loss=None):
    if callable(loss):
      with backprop.GradientTape() as tape:
        if var_list is not None:
          tape.watch(var_list)
        loss_value = loss()
        loss_value = self._scale_loss(loss_value)

      if var_list is None:
        var_list = tape.watched_variables()
      with ops.control_dependencies([loss_value]):
        grads = tape.gradient(loss_value, var_list, grad_loss)
      return list(zip(grads, var_list))

    # Non-callable/Tensor loss case
    # Scale loss if using a "mean" loss reduction and multiple replicas.
    loss = self._scale_loss(loss)
    if var_list is None:
      var_list = (
          variables.trainable_variables() +
          ops.get_collection(ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES))
    else:
      var_list = nest.flatten(var_list)
    var_list += ops.get_collection(ops.GraphKeys._STREAMING_MODEL_PORTS)
    processors = [_get_processor(v) for v in var_list]
    var_refs = [p.target() for p in processors]
    grads = gradients.gradients(
        loss, var_refs, grad_ys=grad_loss,
        gate_gradients=(gate_gradients == Optimizer.GATE_OP),
        aggregation_method=aggregation_method,
        colocate_gradients_with_ops=colocate_gradients_with_ops)
    if gate_gradients == Optimizer.GATE_GRAPH:
      grads = control_flow_ops.tuple(grads)
    grads_and_vars = list(zip(grads, var_list))
    return grads_and_vars

一般来说,loss是一个tensor,因此我们主要关注上述代码的第16-29行。在第16-19行,我们获取需要求解梯度的变量列表。如果没有指定var_list,那么compute_gradient函数会默认获取所有的TRAINABLE_VARIABLESTRAINABLE_RESOURCE_VARIABLES。第20行貌似啥也没做,因为在源代码中2找不到名为_STRAMING_MODEL_PROTS的变量集合。注意到第23行调用gradients.gradients函数计算梯度,这个函数实现在python/ops/gradient_impl.py文件中,其内部调用了_GradientsHelper来实现真正的计算。因为_GradientsHelper这个函数特别长,而且它和gradients函数的参数相同,所以我们这里先介绍几个重要形参的含义。

@tf_export(v1=["gradients"])
def gradients(ys,
              xs,
              grad_ys=None,
              name="gradients",
              colocate_gradients_with_ops=False,
              gate_gradients=False,
              aggregation_method=None,
              stop_gradients=None,
              unconnected_gradients=UnconnectedGradients.NONE):
  with ops.get_default_graph()._mutation_lock():
    return gradients_util._GradientsHelper(
        ys, xs, grad_ys, name, colocate_gradients_with_ops,
        gate_gradients, aggregation_method, stop_gradients,
        unconnected_gradients)

ys和xs参数均接收单个tensor或tensor列表,分别对应\(\frac{\partial Y}{\partial X}\)中的\(Y\)和\(X\)。

grad_ys参数接收单个tensor或tensor列表,它的维度必须和ys的维度相同。grad_ys为ys中的每个tensor提供初始值,如果grad_ys为None,那么ys中每个tensor的初始值就被设置为1。

aggregation_method表示梯度聚合的方式,TensorFlow支持的所有聚合方式均定义于tf.AggregationMethods类中,包括ADD_NDEFAULTEXPERIMENTAL_N以及EXPERIMENTAL_ACCUMULATE_N等方法。

stop_gradients参数接收单个tensor或tensor列表,这些tensor不参与反向传播梯度的计算。注意,tensorflow提供了另一个接口tf.stop_gradients,也可以完成相同的工作。二者的区别在于tf.stop_gradient作用于计算图构建时,而tf.gradients的stop_gradients参数作用于计算图的运行时。

_GradientHelper是构建反向计算图并求解梯度的关键方法,需要仔细阅读。这里暂时给出一个简略的分析。

这个方法会维护两个重要变量:

  • 一个队列queue,队列里存放计算图里所有出度为0的Op
  • 一个字典grads,字典的键是Op本身,值是该Op每个输出端收到的梯度列表

反向传播求梯度时,每从队列中弹出一个Op,都会把它输出变量的梯度加起来(对应全微分定理)得到out_grads,然后获取对应的梯度计算函数grad_fn。Op本身和out_grads会传递给grad_fn做参数,求出输入的梯度。每当一个Op的梯度被求出来,就会更新所有未经处理的Op的出度和queue。当queue为空时,就表示整个反向计算图处理完毕。

if grad_fn:
  in_grads = _MaybeCompile(grad_scope, op, func_call,
                           lambda: grad_fn(op, *out_grads))
else:
  in_grads = _MaybeCompile(grad_scope, op, func_call,
                           lambda: _SymGrad(op, out_grads, xs))

grad_fn是梯度计算函数,它用来计算给定Op的梯度。在TensorFlow里,每个Op都会定义一个对应的梯度计算函数。例如,下面是平方函数(tf.square)的梯度:

@ops.RegisterGradient("Square")
def _SquareGrad(op, grad):
  x = op.inputs[0]
  # Added control dependencies to prevent 2*x from being computed too early.
  with ops.control_dependencies([grad]):
    x = math_ops.conj(x)
    y = constant_op.constant(2.0, dtype=x.dtype)
    return math_ops.multiply(grad, math_ops.multiply(x, y))

apply_gradients是optimizer.minimize的第二阶段,它将梯度更新应用到变量上。根据所使用的学习算法的不同,apply_gradients内部会调用不同的Optimizer实现。下面的代码展示了apply_gradients的核心执行逻辑。

converted_grads_and_vars = []
for g, v in grads_and_vars:
    if g is not None:
        g = ops.convert_to_tensor_or_indexed_slices(g)
    p = _get_processor(v)
    converted_grads_and_vars.append((g, v, p))

converted_grads_and_vars = tuple(converted_grads_and_vars)
var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]

with ops.init_scope():
    self._create_slots(var_list)
update_ops = []
with ops.name_scope(name, self._name) as name:
    self._prepare()
    for grad, var, processor in converted_grads_and_vars:
        if grad is None:
            continue
        else:
            scope_name = var.op.name
        with ops.name_scope("update_" + scope_name), ops.colocate_with(var):
          update_ops.append(processor.update_op(self, grad))
      if global_step is None:
        apply_updates = self._finish(update_ops, name)
      else:
        with ops.control_dependencies([self._finish(update_ops, "update")]):
          with ops.colocate_with(global_step):
            if isinstance(
                global_step, resource_variable_ops.BaseResourceVariable):
              # TODO(apassos): the implicit read in assign_add is slow; consider
              # making it less so.
              apply_updates = resource_variable_ops.assign_add_variable_op(
                  global_step.handle,
                  ops.convert_to_tensor(1, dtype=global_step.dtype),
                  name=name)
            else:
              apply_updates = state_ops.assign_add(global_step, 1, name=name)

      if not context.executing_eagerly():
        if isinstance(apply_updates, ops.Tensor):
          apply_updates = apply_updates.op
        train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
        if apply_updates not in train_op:
          train_op.append(apply_updates)

      return apply_updates

在1-6行,程序将每个非None的梯度转化成tensor(稠密)或indexedslices(稀疏),根据每个变量存储类型的不同,我们获取到不同的processor(第5行),最终将一个三元组(g, v, p)保存到列表converted_grads_and_vars中,以备后用。这里主要解释下第5行,对于所有可优化的变量OptimizableVariable,根据其类型的不同,我们需要调用不同update_op。

在程序的8-10行,我们首先获取到非None的grad对应的var,然后对相应的var创建slots。_create_slots方法需要Optimzier的子类自己去实现,它的作用是创建学习算法所需要的中间变量。以momentum sgd为例,它的更新公式为:

accumulation = momentum * accumulation + gradient
variable -= learning_rate * accumulation

可以看到,它在更新变量的时候需要用到一个中间变量accumulation。因此,我们需要为每个变量创建一个slot,用来保存这个中间变量,以便在下次进行权重更新时继续使用它:

def _create_slots(self, var_list):
  for v in var_list:
      self._zeros_slot(v, "momentum", self._name)

在程序的12-20行,我们为每个var添加对应的update_op。第14行调用了_prepare方法,它是用来初始化一些必要的变量(例如学习率、动量),为应用梯度做准备。还是以momentum sgd为例子,它的_prepare函数实现如下:

def _prepare(self):
    learning_rate = self._learning_rate
  if callable(learning_rate):
      learning_rate = learning_rate()
  self._learning_rate_tensor = ops.convert_to_tensor(learning_rate,
                                                     name="learning_rate")
  momentum = self._momentum
  if callable(momentum):
      momentum = momentum()
  self._momentum_tensor = ops.convert_to_tensor(momentum, name="momentum")

可以看到,在这个函数中,它将学习率和动量都转化成了tensor(为什么要转成tensor?)。准备工作完成后,我们就可以给每个非None的grad和var添加相应的update_op。注意第18行,我们使用ops.colocate_with(var)把var对应的update_op放置到var所在的设备上。最后,我们调用_finish函数以完成所有的更新。一般来说_finish函数不需要重写,唯一的例外是Adam算法,它在实现时重写了_finish算法。

前面提到,对于所有的可优化的变量,根据其类型的不同,我们会调用不同的update_op。一般来说,不同的Optimizer需要实现的update_op主要包括四种:_apply_dense、_resource_apply_dense、_apply_sparse和_resource_apply_sparse,其中前两种对应稠密更新,后两种对应稀疏更新。以_resource开头的方法是针对variable handle,而不带_resource的update_op则是针对variable的。这里我们还是以momentum sgd为例,介绍一下它的_apply_dense的实现。下面是对应的代码,可以看到它首先获取了中间变量mom,然后直接调用了training_ops中的apply_momentum方法。

def _apply_dense(self, grad, var):
    mom = self.get_slot(var, "momentum")
  return training_ops.apply_momentum(
             var, mom,
      math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype),
      grad,
      math_ops.cast(self._momentum_tensor, var.dtype.base_dtype),
      use_locking=self._use_locking,
      use_nesterov=self._use_nesterov).op

apply_momentum方法是由bazel构建生成的代码,它会调用op_def_lib中的_apply_op_helper函数,将一个名为ApplyMomentum的Op添加到计算图中:

_, _, _op = _op_def_lib._apply_op_helper(
            "ApplyMomentum", var=var, accum=accum, lr=lr, grad=grad,
                  momentum=momentum, use_locking=use_locking,
                        use_nesterov=use_nesterov, name=name)

根据gen_training_ops.py中的注释,我们可以找到ApplyMomemtum这个Op的注册信息:

REGISTER_OP("ApplyMomentum")
    .Input("var: Ref(T)")
    .Input("accum: Ref(T)")
    .Input("lr: T")
    .Input("grad: T")
    .Input("momentum: T")
    .Output("out: Ref(T)")
    .Attr("T: numbertype")
    .Attr("use_locking: bool = false")
    .Attr("use_nesterov: bool = false")
    .SetShapeFn([](InferenceContext* c) {
      return ApplyMomentumShapeFn(c, false /* not sparse */);
    });

最终,我们可以在kernel目录下找到ApplyMomentum这个Op的实现。针对不同的设备,ApplyMomentum有不同的特化实现。

template <typename T>
struct ApplyMomentum<CPUDevice, T> {
  void operator()(const CPUDevice& d, typename TTypes<T>::Flat var,
                  typename TTypes<T>::Flat accum,
                  typename TTypes<T>::ConstScalar lr,
                  typename TTypes<T>::ConstFlat grad,
                  typename TTypes<T>::ConstScalar momentum, bool use_nesterov) {
    accum.device(d) = accum * momentum() + grad;
    if (use_nesterov) {
      var.device(d) -= grad * lr() + accum * momentum() * lr();
    } else {
      var.device(d) -= accum * lr();
    }
  }
};

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章