基于ray的分布式机器学习(一)
阅读原文时间:2023年07月12日阅读:4

基本思路: 1、对数据分块,使用多个worker分别处理一个数据块,每个worker暴露两个接口,分别是损失计算的接口loss和梯度计算的接口grad; 2、同时定义full_loss和full_grad接口对每个worker的loss和grad进行聚合; 3、使用bfgs算法进行参数优化,分别使用full_loss和full_grad作为bfgs的损失函数和梯度函数,即可进行网络参数优化; 注意:在此实现中,每个worker内部每次均计算一个数据块上的损失和梯度,而非一个batch**。

**#0、导入依赖
import numpy as np
import os
import scipy.optimize

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import ray
import ray.experimental.tf_utils

#1、定义模型
class LinearModel(object):
def __init__(self, shape):
"""Creates a LinearModel object."""
x = tf.placeholder(tf.float32, [None, shape[0]])
w = tf.Variable(tf.zeros(shape))
b = tf.Variable(tf.zeros(shape[1]))
self.x = x
self.w = w
self.b = b
y = tf.nn.softmax(tf.matmul(x, w) + b)
y_ = tf.placeholder(tf.float32, [None, shape[1]])
self.y_ = y_
cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
self.cross_entropy = cross_entropy
self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b])
self.sess = tf.Session()

    self.variables = ray.experimental.tf\_utils.TensorFlowVariables(  
        cross\_entropy, self.sess)

def loss(self, xs, ys):  
    """计算loss"""  
    return float(  
        self.sess.run(  
            self.cross\_entropy, feed\_dict={  
                self.x: xs,  
                self.y\_: ys  
            }))

def grad(self, xs, ys):  
    """计算梯度"""  
    return self.sess.run(  
        self.cross\_entropy\_grads, feed\_dict={  
            self.x: xs,  
            self.y\_: ys  
        })

#2、定义远程worker,用于计算模型loss、grads
@ray.remote
class NetActor(object):
def __init__(self, xs, ys):
os.environ["CUDA_VISIBLE_DEVICES"] = ""
with tf.device("/cpu:0"):
self.net = LinearModel([784, 10])
self.xs = xs
self.ys = ys

# 计算一个数据块的loss  
def loss(self, theta):  
    net = self.net  
    net.variables.set\_flat(theta)  
    return net.loss(self.xs, self.ys)

# 计算一个数据块的梯度  
def grad(self, theta):  
    net = self.net  
    net.variables.set\_flat(theta)  
    gradients = net.grad(self.xs, self.ys)  
    return np.concatenate(\[g.flatten() for g in gradients\])

def get\_flat\_size(self):  
    return self.net.variables.get\_flat\_size()

#3、获取远程worker损失的函数
def full_loss(theta):
theta_id = ray.put(theta)
loss_ids = [actor.loss.remote(theta_id) for actor in actors]
return sum(ray.get(loss_ids))

#4、获取远程worker梯度的函数
def full_grad(theta):
theta_id = ray.put(theta)
grad_ids = [actor.grad.remote(theta_id) for actor in actors]
# 使用fmin_l_bfgs_b须转换为float64数据类型
return sum(ray.get(grad_ids)).astype("float64")

#5、使用lbfgs进行训练
if __name__ == "__main__":
ray.init()

mnist = input\_data.read\_data\_sets("MNIST\_data", one\_hot=True)

  # 数据分块,每个worker跑一个数据块
num_batches = 10
batch_size = mnist.train.num_examples // num_batches
batches = [mnist.train.next_batch(batch_size) for _ in range(num_batches)]

actors = \[NetActor.remote(xs, ys) for (xs, ys) in batches\]

  # 参数初始化
dim = ray.get(actors[0].get_flat_size.remote())
theta_init = 1e-2 * np.random.normal(size=dim)

  # 优化
result = scipy.optimize.fmin_l_bfgs_b(
full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章