基本思路: 1、对数据分块,使用多个worker分别处理一个数据块,每个worker暴露两个接口,分别是损失计算的接口loss和梯度计算的接口grad; 2、同时定义full_loss和full_grad接口对每个worker的loss和grad进行聚合; 3、使用bfgs算法进行参数优化,分别使用full_loss和full_grad作为bfgs的损失函数和梯度函数,即可进行网络参数优化; 注意:在此实现中,每个worker内部每次均计算一个数据块上的损失和梯度,而非一个batch**。
**#0、导入依赖
import numpy as np
import os
import scipy.optimize
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import ray
import ray.experimental.tf_utils
#1、定义模型
class LinearModel(object):
def __init__(self, shape):
"""Creates a LinearModel object."""
x = tf.placeholder(tf.float32, [None, shape[0]])
w = tf.Variable(tf.zeros(shape))
b = tf.Variable(tf.zeros(shape[1]))
self.x = x
self.w = w
self.b = b
y = tf.nn.softmax(tf.matmul(x, w) + b)
y_ = tf.placeholder(tf.float32, [None, shape[1]])
self.y_ = y_
cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
self.cross_entropy = cross_entropy
self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b])
self.sess = tf.Session()
self.variables = ray.experimental.tf\_utils.TensorFlowVariables(
cross\_entropy, self.sess)
def loss(self, xs, ys):
"""计算loss"""
return float(
self.sess.run(
self.cross\_entropy, feed\_dict={
self.x: xs,
self.y\_: ys
}))
def grad(self, xs, ys):
"""计算梯度"""
return self.sess.run(
self.cross\_entropy\_grads, feed\_dict={
self.x: xs,
self.y\_: ys
})
#2、定义远程worker,用于计算模型loss、grads
@ray.remote
class NetActor(object):
def __init__(self, xs, ys):
os.environ["CUDA_VISIBLE_DEVICES"] = ""
with tf.device("/cpu:0"):
self.net = LinearModel([784, 10])
self.xs = xs
self.ys = ys
# 计算一个数据块的loss
def loss(self, theta):
net = self.net
net.variables.set\_flat(theta)
return net.loss(self.xs, self.ys)
# 计算一个数据块的梯度
def grad(self, theta):
net = self.net
net.variables.set\_flat(theta)
gradients = net.grad(self.xs, self.ys)
return np.concatenate(\[g.flatten() for g in gradients\])
def get\_flat\_size(self):
return self.net.variables.get\_flat\_size()
#3、获取远程worker损失的函数
def full_loss(theta):
theta_id = ray.put(theta)
loss_ids = [actor.loss.remote(theta_id) for actor in actors]
return sum(ray.get(loss_ids))
#4、获取远程worker梯度的函数
def full_grad(theta):
theta_id = ray.put(theta)
grad_ids = [actor.grad.remote(theta_id) for actor in actors]
# 使用fmin_l_bfgs_b须转换为float64数据类型
return sum(ray.get(grad_ids)).astype("float64")
#5、使用lbfgs进行训练
if __name__ == "__main__":
ray.init()
mnist = input\_data.read\_data\_sets("MNIST\_data", one\_hot=True)
# 数据分块,每个worker跑一个数据块
num_batches = 10
batch_size = mnist.train.num_examples // num_batches
batches = [mnist.train.next_batch(batch_size) for _ in range(num_batches)]
actors = \[NetActor.remote(xs, ys) for (xs, ys) in batches\]
# 参数初始化
dim = ray.get(actors[0].get_flat_size.remote())
theta_init = 1e-2 * np.random.normal(size=dim)
# 优化
result = scipy.optimize.fmin_l_bfgs_b(
full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章