我是靠谱客的博主 喜悦小虾米,最近开发中收集的这篇文章主要介绍深度学习实战(六) 多机多卡分布式训练cifar10完整实现准备工作:实现部分(附详细注释):最后运行分布式训练:,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

准备工作:

数据集下载地址:

http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz

 

实现部分(附详细注释):

首先获取用于训练的小批量数据,由于获取过程中需要对图像进行处理,避免阻塞训练进程,我们开启16个线程来从队列获取批量图像。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import tensorflow as tf
import matplotlib.pyplot as plt
# %matplotlib inline
IMAGE_SIZE = 32
NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000 #训练集的样本总数
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000 #验证集的样本总数
cifar_label_bytes = 1
# 2 for CIFAR-100 第一个字节为label
cifar_height = 32
cifar_width = 32
cifar_depth = 3 #通道数
#生产批量输入
def generate_batch_inputs(eval_data, shuffle, data_dir, batch_size):
"""
参数:
eval_data: bool值,指定训练或者验证.
shuffle: bool值,是否将数据顺序打乱.
data_dir: CIFAR-10数据集所在目录.
batch_size: 批量大小.
返回值:
images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
labels: Labels. 1D tensor of [batch_size] size.
"""
if not eval_data:
filepath = os.path.join(data_dir, 'data_batch_*')
num_examples_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN
else:
filepath = os.path.join(data_dir, 'test_batch*')
num_examples_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_EVAL
files = tf.train.match_filenames_once(filepath)
with tf.name_scope('input'):
# tf.train.string_input_producer会使用初始化时提供的文件列表创建一个输入队列,
# 创建好的输入队列可以作为文件读取函数的参数.
# shuffle参数为True时,文件在加入队列之前会被打乱顺序
# tf.train.string_input_producer生成的输入队列可以同时被多个文件读取线程操作,
# 而且输入队列会将队列中的文件均匀地分配给不同的线程,不会出现有些文件被处理过多次而有些文件还没有被处理过的情况
# 当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件类表中的文件全部重新加入队列,
# 通过num_epochs参数来限制加载初始化文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,
# 如果继续尝试读取新的文件,输入队列会报OutOfRange的错误。这里我们取None不做限制
filename_queue = tf.train.string_input_producer(files, shuffle=False, num_epochs=None)
# 从文件队列读取样本
image_bytes = cifar_height * cifar_width * cifar_depth
#每条数据的长度
record_bytes = cifar_label_bytes + image_bytes
# 读取固定长度的一条数据
reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
key, value = reader.read(filename_queue)
# 格式转换
record_bytes = tf.decode_raw(value, tf.uint8)
# 第一个字节为分类标签
label = tf.cast(
tf.strided_slice(record_bytes, [0], [cifar_label_bytes]), tf.int32)
# 标签字节后面的字节表示图片信息
# reshape from [depth * height * width] to [depth, height, width].
depth_major = tf.reshape(
tf.strided_slice(record_bytes, [cifar_label_bytes],
[cifar_label_bytes + image_bytes]),
[cifar_depth, cifar_height, cifar_width])
# Convert from [depth, height, width] to [height, width, depth].
uint8image = tf.transpose(depth_major, [1, 2, 0])
reshaped_image = tf.cast(uint8image, tf.float32)
#plt.imshow(reshaped_image)
'''
if not eval_data:
# 数据增强用于训练
# 随机的对图片进行一些处理,原来的一张图片在多次epoch中就会生成多张不同的图片,这样就增加了样本数量
#由于数据增强会耗费大量的CPU时间,因此我们用16个线程来处理
# Randomly crop a [IMAGE_SIZE, IMAGE_SIZE] section of the image.
resized_image = tf.random_crop(reshaped_image, [IMAGE_SIZE, IMAGE_SIZE, 3])
# Randomly flip the image horizontally.
resized_image = tf.image.random_flip_left_right(resized_image)
# Because these operations are not commutative, consider randomizing
# the order their operation.
# NOTE: since per_image_standardization zeros the mean and makes
# the stddev unit, this likely has no effect see tensorflow#1458.
resized_image = tf.image.random_brightness(resized_image,
max_delta=63)
resized_image = tf.image.random_contrast(resized_image,
lower=0.2, upper=1.8)
else:
# 裁剪中间部分用于验证
resized_image = tf.image.resize_image_with_crop_or_pad(reshaped_image,
IMAGE_SIZE, IMAGE_SIZE)
# 减去均值并除以像素的方差
float_image = tf.image.per_image_standardization(resized_image)
'''
# 这里我们不对图片进行任何处理,得到更大的图像,以便后面训练得到更快的收敛和更好的精度
float_image = reshaped_image
float_image.set_shape([IMAGE_SIZE, IMAGE_SIZE, 3])
label.set_shape([1])
min_fraction_of_examples_in_queue = 0.4
min_queue_examples = int(num_examples_per_epoch *
min_fraction_of_examples_in_queue)
# TensorFlow提供了tf.train.shuffle_batch和tf.train.batch函数来将单个的样例组织成batch的形式输出,
# 这两个函数都会生成一个队列,队列的入队操作是生成单个样例的方法,而每次出队得到的是一个batch的样例,它们唯一的区别在于是否会将数据顺序打乱
# 参数capacity表示最多可以存储的样例个数,太大就会占用很多内存,太小则会因为出队操作没有数据而阻塞,影响效率
# 参数min_after_dequeue限制了出队时队列中元素的最少个数,当队列中元素太少时,随机打乱样例顺序的作用就不大了。
# tf.train.shuffle_batch和tf.train.batch函数除了可以将单个训练数据整理成输入batch,也提供了并行化处理数据的方法。
# 参数num_threads,可以指定多个线程同时执行入队操作。
num_preprocess_threads = 16
if shuffle:
images, label_batch = tf.train.shuffle_batch(
[float_image, label],
batch_size=batch_size,
num_threads=num_preprocess_threads,
capacity=min_queue_examples + 3 * batch_size,
min_after_dequeue=min_queue_examples)
else:
images, label_batch = tf.train.batch(
[float_image, label],
batch_size=batch_size,
num_threads=num_preprocess_threads,
capacity=min_queue_examples + 3 * batch_size)
# Display the training images in the visualizer.
#tf.summary.image('images', images)
return images, tf.reshape(label_batch, [batch_size])

多机多卡tensorflow分布式训练实现代码:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from datetime import datetime
import os.path
import re
import time
import numpy as np
from six.moves import xrange
import tensorflow as tf
import Cifar_10_Input
import Cifar_10
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('train_dir', './models_cifar10',
"""Directory where to write event logs """
"""and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000,
"""Number of batches to run.""")
tf.app.flags.DEFINE_boolean('log_device_placement', False,
"""Whether to log device placement.""")
tf.app.flags.DEFINE_integer('batch_size', 128,
"""Number of images to process in a batch.""")
tf.app.flags.DEFINE_string('data_dir', './cifar-10-batches-bin',
"""Path to the CIFAR-10 data directory.""")
# 每台机器的 GPU 个数,这里单机上运行,因此为 0
tf.app.flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
# 定义分布式参数
# 参数服务器parameter server节点
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222",
"Comma-separated list of hostname:port pairs")
# 两个worker节点
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224",
"Comma-separated list of hostname:port pairs")
# 设置job name参数
tf.app.flags.DEFINE_string('job_name', None, 'job name: worker or ps')
# 设置任务的索引
tf.app.flags.DEFINE_integer('task_index', None, 'Index of task within the job')
# 选择异步并行,同步并行
tf.app.flags.DEFINE_bool('sync_replicas', True,'Whether or not to synchronize the replicas during training.')
# 如果服务器已经存在,采用 gRPC 协议通信;如果不存在,采用进程间通信
tf.app.flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
# 在同步训练模式下,设置收集的工作节点的数量。默认就是工作节点的总数
tf.app.flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update "
"is applied (For sync_replicas mode only; default: "
"num_workers)")
# Global constants describing the CIFAR-10 data set.
IMAGE_SIZE = Cifar_10_Input.IMAGE_SIZE
NUM_CLASSES = Cifar_10_Input.NUM_CLASSES
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = Cifar_10_Input.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = Cifar_10_Input.NUM_EXAMPLES_PER_EPOCH_FOR_EVAL
# Constants describing the training process.
MOVING_AVERAGE_DECAY = 0.9999
# The decay to use for the moving average.
NUM_EPOCHS_PER_DECAY = 350.0
# Epochs after which learning rate decays.
LEARNING_RATE_DECAY_FACTOR = 0.1
# Learning rate decay factor.
INITIAL_LEARNING_RATE = 0.1
# Initial learning rate.
# 使用多个服务器的多个gpu训练
def train():
"""Train CIFAR-10 for a number of steps."""
if FLAGS.job_name is None or FLAGS.job_name == '':
raise ValueError('Must specify an explicit job_name !')
else:
print('job_name : %s' % FLAGS.job_name)
if FLAGS.task_index is None or FLAGS.task_index == '':
raise ValueError('Must specify an explicit task_index!')
else:
print('task_index : %d' % FLAGS.task_index)
ps_spec = FLAGS.ps_hosts.split(',')
worker_spec = FLAGS.worker_hosts.split(',')
#如果是同步模式,则先获取同步更新模型参数所需要的副本数
num_workers = len(worker_spec)
# 创建集群
cluster = tf.train.ClusterSpec({'ps': ps_spec, 'worker': worker_spec})
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
# 创建当前机器的server,用以连接到cluster
server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
# 如果当前节点是parameter server,则不再进行后续的操作,而是使用server.join等待worker工作
if FLAGS.job_name == "ps":
server.join()
is_chief = (FLAGS.task_index == 0)
'''
设置 CUDA_VISIBLE_DEVICES 环境变量,限制各个工作节点只可见一个 GPU,启动进程时添加环境变量即可。
例如,每个工作节点只能访问一个 GPU,在代码中不需要额外指定
这里是通过代码来分配
'''
if FLAGS.num_gpus > 0:
# 避免gpu分配冲突:现在为相应机器中的每个worker分配task_num - > #gpu
gpu = (FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
# 我们使用 tf.train.replica_device_setter 将涉及变量的操作分配到参数服务器上,并使用 CPU;
# 将涉及非变量的操作分配到工作节点上,使用 worker_device 的值。
# 在这个 with 语句之下定义的参数,会自动分配到参数服务器上去定义 如果有多个参数服务器,就轮流循环分配
# 在深度学习训练中,一般图的计算,对于每个worker task来说,都是相同的,所以我们会把所有图计算、变量定义等代码,都写到下面这个语句下
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device='/job:ps/cpu:0',
cluster=cluster
)):
'''-------------------------------分割线---------------------------------'''
# 全局训练step数
global_step = tf.Variable(0, name="global_step", trainable=False)
training = tf.placeholder_with_default(False, shape=(), name='training')
# Get images and labels for CIFAR-10.
images, labels = Cifar_10_Input.generate_batch_inputs(eval_data=(training==False), shuffle=True,
data_dir=FLAGS.data_dir, batch_size=FLAGS.batch_size)
# Reuse variables for the next tower.
# tf.get_variable_scope().reuse_variables()
logits = Cifar_10.inference(images, is_training=training)
# Calculate the average cross entropy loss across the batch.
labels = tf.cast(labels, tf.int64)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=labels, logits=logits, name='cross_entropy')
loss = tf.reduce_mean(cross_entropy, name='loss')
correct = tf.nn.in_top_k(logits, labels, 1)
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# Calculate the learning rate schedule.
num_batches_per_epoch = (NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
FLAGS.batch_size)
decay_steps = int(num_batches_per_epoch * NUM_EPOCHS_PER_DECAY)
# Decay the learning rate exponentially based on the number of steps.
lr = tf.train.exponential_decay(INITIAL_LEARNING_RATE,
global_step,
decay_steps,
LEARNING_RATE_DECAY_FACTOR,
staircase=True)
# Create an optimizer that performs gradient descent.
opt = tf.train.GradientDescentOptimizer(lr)
'''-------------------------------分割线---------------------------------'''
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
# 创建同步训练的优化器,tf.train.SyncReplicasOptimizer实质上是对原有优化器的一个扩展,
# 我们传入原有优化器及其他参数,它会将原有优化器改造为同步分布式训练版本
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="cifar10_sync_replicas")
# 记得传入global_step以同步
train_step = opt.minimize(loss, global_step=global_step)
#如果是同步并且是chief task
if FLAGS.sync_replicas and is_chief:
# 创建队列执行器
chief_queue_runner = opt.get_chief_queue_runner()
# 创建全局参数初始化器
sync_init_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
# logdir地址不要写错了
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=FLAGS.train_dir, # 保存和加载模型的路径 通过sv.saver访问
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
# sv中已经创建,不能重复创建
# saver = tf.train.Saver()
sess_config = tf.ConfigProto(
allow_soft_placement=True, # 软放置 如果该操作函数没有 GPU 实现时,会本动使用 CPU 设备
log_device_placement=FLAGS.log_device_placement, # 告诉放置器在放置节点时记录消息
device_filters=["/job:ps",
"/job:worker/task:%d" % FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# chief task要执行下面两个操作:1.全局变量初始化 2.启动队列 此时其他task都处于等待状态
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
'''-------------------------------执行训练---------------------------------'''
# 这里没用到tf.train.Coordinator
local_step = 0
while True:
_, loss_value ,step ,accuracy_value = sess.run([train_step, loss, global_step, accuracy],feed_dict={training: True})
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
local_step += 1
format_str = ('%s: step %d, loss = %.2f ,Train accuracy = %.2f')
print (format_str % (datetime.now(), step, loss_value, accuracy_value))
# Save the model checkpoint periodically.
if step % 100 == 0 or (step + 1) == FLAGS.max_steps:
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
sv.saver.save(sess, checkpoint_path, global_step=step)
#
添加测试集验证部分
sv.saver.restore(sess, tf.train.latest_checkpoint(FLAGS.train_dir))
_, loss_test
,accuracy_test = sess.run([train_step, loss, accuracy],feed_dict={training: False})
format_str = ('%s: loss = %.2f ,Test accuracy = %.2f')
print (format_str % (datetime.now(), loss_test, accuracy_test))
if step >= FLAGS.max_steps:
break
def main(argv=None):
if tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.DeleteRecursively(FLAGS.train_dir)
tf.gfile.MakeDirs(FLAGS.train_dir)
train()
if __name__ == '__main__':
tf.app.run()

训练模型


"""Builds the CIFAR-10 network.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import re
import sys
from six.moves import urllib
import tensorflow as tf
import Cifar_10_Input
import tensorflow.contrib.slim as slim
# Global constants describing the CIFAR-10 data set.
NUM_CLASSES = Cifar_10_Input.NUM_CLASSES
def inference(images,
is_training=True,
dropout_keep_prob=0.8,
num_class=NUM_CLASSES,
reuse=tf.AUTO_REUSE):
weight_decay = 0.004
batch_norm_params = {
# Decay for the moving averages.
'decay': 0.995,
# epsilon to prevent 0s in variance.
'epsilon': 0.001,
# force in-place updates of mean and variance estimates
'updates_collections': None,
# Moving averages ends up in the trainable variables collection
'variables_collections': [ tf.GraphKeys.TRAINABLE_VARIABLES ],
}
with slim.arg_scope([slim.conv2d, slim.fully_connected],
weights_initializer=slim.initializers.xavier_initializer(),
weights_regularizer=slim.l2_regularizer(weight_decay),
normalizer_fn=slim.batch_norm,
normalizer_params=batch_norm_params):
return inference_in(images, is_training=is_training,
dropout_keep_prob=dropout_keep_prob, num_class=num_class, reuse=reuse)
# 这里简单的写了几层神经网络,最高准确率达到0.99以上,读者可以拟合自己的神经网络
def inference_in(inputs, is_training=True,
dropout_keep_prob=0.8,
num_class=NUM_CLASSES,
reuse=tf.AUTO_REUSE,
scope='InceptionResnetV2'):
with tf.variable_scope(scope, 'InceptionResnetV2', [inputs], reuse=reuse):
with slim.arg_scope([slim.batch_norm, slim.dropout],
is_training=is_training):
with slim.arg_scope([slim.conv2d, slim.max_pool2d, slim.avg_pool2d],
stride=1, padding='SAME'):
# 15 x 15 x 32
net = slim.conv2d(inputs, 32, 3, stride=2, padding='VALID',
scope='Conv2d_1a_3x3')
# 13 x 13 x 32
net = slim.conv2d(net, 32, 3, padding='VALID',
scope='Conv2d_2a_3x3')
# 13 x 13 x 64
net = slim.conv2d(net, 64, 3, scope='Conv2d_2b_3x3')
# 6 x 6 x 64
net = slim.max_pool2d(net, 3, stride=2, padding='VALID',
scope='MaxPool_3a_3x3')
# 6 x 6 x 80
net = slim.conv2d(net, 80, 1, padding='VALID',
scope='Conv2d_3b_1x1')
# 4 x 4 x 192
net = slim.conv2d(net, 192, 3, padding='VALID',
scope='Conv2d_4a_3x3')
# 2 x 2 x 192
net = slim.max_pool2d(net, 2, stride=2, padding='VALID',
scope='MaxPool_5a_3x3')
net = tf.reshape(net, shape=[-1, 192 * 2 * 2])
net = slim.dropout(net, dropout_keep_prob, is_training=is_training,
scope='Dropout')
net = slim.fully_connected(net, num_class, activation_fn=None, reuse=False)
return net

 

最后运行分布式训练:

这里使用一台机器的三个终端模拟实现

打开三个终端,首先cd到代码和数据集所在目录,分别运行以下命令:

终端一: python3 Cifar_10_Multi_Server_Multi_gpu_Train.py --job_name=ps --task_index=0

终端二: python3 Cifar_10_Multi_Server_Multi_gpu_Train.py --job_name=worker --task_index=1

终端三: python3 Cifar_10_Multi_Server_Multi_gpu_Train.py --job_name=worker --task_index=0

训练数据截取:

2018-08-29 21:12:25.379469: step 17264, loss = 0.58 ,Train accuracy = 0.98
2018-08-29 21:12:25.988827: step 17265, loss = 0.57 ,Train accuracy = 0.98
2018-08-29 21:12:26.604014: step 17266, loss = 0.56 ,Train accuracy = 0.97
2018-08-29 21:12:27.219585: step 17267, loss = 0.54 ,Train accuracy = 0.98
2018-08-29 21:12:27.820969: step 17268, loss = 0.57 ,Train accuracy = 1.00
2018-08-29 21:12:28.434372: step 17269, loss = 0.57 ,Train accuracy = 0.97
2018-08-29 21:12:29.051422: step 17270, loss = 0.57 ,Train accuracy = 0.99

 

github地址:

https://github.com/junjun870325/cifar10_distributed_train

最后

以上就是喜悦小虾米为你收集整理的深度学习实战(六) 多机多卡分布式训练cifar10完整实现准备工作:实现部分(附详细注释):最后运行分布式训练:的全部内容,希望文章能够帮你解决深度学习实战(六) 多机多卡分布式训练cifar10完整实现准备工作:实现部分(附详细注释):最后运行分布式训练:所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部