概述
《TensorFlow2.X项目实战》
训练模型的两种方式
静态图方式
是1.X 中张量流的主要运行方式。机制就是将定义和运行分离。相当于程序搭建一个结构(内存中搭建一个图),然后让数据(张量)按照图的顺序进行计算得到结果。
静态图方式分为两个过程:1. 模型构建,从正反两个方向构建模型。2. 模型运行:构建模型后,通过多次迭代的方式运行模型,实现训练的过程。
在 2.X 版本中使用 1.X 版本的API ,需要改变默认的工作模式:tf.compat.v1.disable_v2_behavior
关闭动态图模式。
使用静态图的步骤:
- 定义操作符(tf.placeholder)
- 构建模型
- 建立会话(tf.session 之类的)
- 会话结果里运行张量流并输出结果。
动态图方式
动态图就是代码中的张量可以直接参与运算,不需要再用会话。就比如矩阵相乘:
- 静态图中,程序生成一个操作符,然后使用run() 方法在计算
- 动态图中,直接进行计算得到结果
使用动态图训练一个具有检查点的回归模型。2.x中生成检查点推荐使用tf.train.Checkpoint(),但需要将网络封装成类,这里使用1.X版本的 Saver类。
生成检查点文件
-
生成saver对象
参数:var_list 要保存的变量,max_to_keep 要保持的检查点文件的个数,keep_checkpoint_every_n_hour 指定间隔几小时保存一次模型。
saver = tf.compat.v1.train.Saver([W,b], max_to_keep=1)#生成saver,保存全部变量
生成一个检查点(一个检查点三个文件) -
生成检查点文件
saver.save(None, savedir+"linermodel.cpkt", global_step)
将检查点文件保存在指定的savedir 下,也将迭代次数 global_step的值放到检查点文件名中 -
载入检测点文件 ,可以在之前的基础上二次训练。就是将检查点中的值恢复到张量图中,程序内部通过张量 global_step 的载入记录迭代次数
kpt = tf.train.latest_checkpoint(savedir) #找到最近检查点文件
if kpt!=None:
saver.restore(None, kpt) #两种加载方式都可以
#saverx.restore(kpt)
# -*- coding: utf-8 -*-
"""
@author: 代码医生工作室
@公众号:xiangyuejiqiren (内有更多优秀文章及学习资料)
@来源: <TensorFlow项目实战2.x>配套代码
@配套代码技术支持:bbs.aianaconda.com
"""
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
print("TensorFlow 版本: {}".format(tf.version.VERSION))#TF2.1
print("Eager execution: {}".format(tf.executing_eagerly()))
#(1)生成模拟数据
train_X = np.linspace(-1, 1, 100)
train_Y = 2 * train_X + np.random.randn(*train_X.shape) * 0.3 # y=2x,但是加入了噪声
#图形显示
plt.plot(train_X, train_Y, 'ro', label='Original data')
plt.legend()
plt.show()
# 定义学习参数
W = tf.Variable(tf.random.normal([1]),dtype=tf.float32, name="weight")
b = tf.Variable(tf.zeros([1]),dtype=tf.float32, name="bias")
global_step = tf.compat.v1.train.get_or_create_global_step() # 全局步数
def getcost(x,y):#定义函数,计算loss值
# 前向结构
z = tf.cast(tf.multiply(np.asarray(x,dtype = np.float32), W)+ b,dtype = tf.float32)
cost =tf.reduce_mean( tf.square(y - z))#loss值
return cost
learning_rate = 0.01
# 随机梯度下降法作为优化器
optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=learning_rate)
#定义saver,演示两种方法处理检查点文件
savedir = "logeager/"
#savedirx = "logeagerx/"
saver = tf.compat.v1.train.Saver([W,b], max_to_keep=1)#生成saver。 max_to_keep=1,表明最多只保存一个检查点文件
#saverx = tfe.Saver([W,b])#生成saver。 max_to_keep=1,表明最多只保存一个检查点文件
# 动态图中没有图,会话的概念,需要手动指定需要保存的参数,这里手动指定参数[W,b]保存
kpt = tf.train.latest_checkpoint(savedir)#找到检查点文件
#kptx = tf.train.latest_checkpoint(savedirx)#找到检查点文件
if kpt!=None:
saver.restore(None, kpt) #两种加载方式都可以
#saverx.restore(kptx)
training_epochs = 10 #迭代训练次数
display_step = 2
plotdata = { "batchsize":[], "loss":[] }#收集训练参数
# 训练的过程很好理解,使用tf.GradientTape 跟踪自动微分,然后进行梯度操作。
while global_step/len(train_X) < training_epochs: #迭代训练模型
step = int( global_step/len(train_X) )
with tf.GradientTape() as tape:
cost_=getcost(train_X,train_Y)
gradients=tape.gradient(target=cost_,sources=[W,b]) #计算梯度
optimizer.apply_gradients(zip(gradients,[W,b]),global_step) # 进行优化
#显示训练中的详细信息
if step % display_step == 0:
cost = cost_.numpy()
print ("Epoch:", step+1, "cost=", cost,"W=", W.numpy(), "b=", b.numpy())
if not (cost == "NA" ):
plotdata["batchsize"].append(global_step.numpy())
plotdata["loss"].append(cost)
saver.save(None, savedir+"linermodel.cpkt", global_step)
#saverx.save(savedirx+"linermodel.cpkt", global_step)
print (" Finished!")
saver.save(None, savedir+"linermodel.cpkt", global_step)
#saverx.save(savedirx+"linermodel.cpkt", global_step)
print ("cost=", getcost(train_X,train_Y).numpy() , "W=", W.numpy(), "b=", b.numpy())
#显示模型
plt.plot(train_X, train_Y, 'ro', label='Original data')
plt.plot(train_X, W * train_X + b, label='Fitted line')
plt.legend()
plt.show()
def moving_average(a, w=10):#定义生成loss可视化的函数
if len(a) < w:
return a[:]
return [val if idx < w else sum(a[(idx-w):idx])/w for idx, val in enumerate(a)]
plotdata["avgloss"] = moving_average(plotdata["loss"])
plt.figure(1)
plt.subplot(211)
plt.plot(plotdata["batchsize"], plotdata["avgloss"], 'b--')
plt.xlabel('Minibatch number')
plt.ylabel('Loss')
plt.title('Minibatch run vs. Training loss')
plt.show()
动态图的编程方式
动态图中获取参数
动态图使用python变量的生命周期机制来存放参数变量,不可以像静态图那样通过图获取指定变量。在训练模型,保存模型过程中,从动态图中获取参数有两种方法:
- 模型封装成类,借助类的实例化对象在内存中的生命周期来管理模型变量,即使用 variables 成员变量
- 通过variable_scope.EagerVariableStore() 方法将动态图的变量保存在全局容器中。然后通过实例化对象取出变量。
from tensorflow.python.ops import variable_scope
#建立数据集
dataset = tf.data.Dataset.from_tensor_slices( (np.reshape(train_X,[-1,1]),np.reshape(train_X,[-1,1])) )
dataset = dataset.repeat().batch(1)
global_step = tf.compat.v1.train.get_or_create_global_step()
container = variable_scope.EagerVariableStore() #实例化,得到容器
learning_rate = 0.01
# 随机梯度下降法作为优化器
optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=learning_rate)
def getcost(x,y):#定义函数,计算loss值
# 前向结构
with container.as_default():
#将动态图使用的层包装起来。可以得到变量,将网络参数保存到容器中
# z = tf.contrib.slim.fully_connected(x, 1,reuse=tf.AUTO_REUSE)
z = tf.compat.v1.layers.dense(x,1, name="l1") # 全连接
cost =tf.reduce_mean( input_tensor=tf.square(y - z))#loss值
return cost
def grad( inputs, targets):
with tf.GradientTape() as tape:
loss_value = getcost(inputs, targets)
return tape.gradient(loss_value,container.trainable_variables())
training_epochs = 20 #迭代训练次数
display_step = 2
#迭代训练模型
for step,value in enumerate(dataset) :
grads = grad( value[0], value[1]) # 通过container容器获取需要训练的参数
optimizer.apply_gradients(zip(grads, container.trainable_variables()), global_step=global_step)
if step>=training_epochs:
break
#显示训练中的详细信息
if step % display_step == 0:
cost = getcost (value[0], value[1])
print ("Epoch:", step+1, "cost=", cost.numpy())
print (" Finished!")
print ("cost=", cost.numpy() )
for i in container.trainable_variables():
print(i.name,i.numpy())
估算器框架接口的应用
Estimators 是TF的一套高级API,提供了一整套训练模型,测试模型的准确率以及预测的方法。
框架内部自动实现整体的数据流向搭建,包括检查点文件的导出和恢复,保存tensorboard,初始化变量,异常处理等。开发模型时只要实现对应方法就行。2.X完全兼容1.X的估算器框架代码。
- 组成
- 输入函数,主要由 tf.data.Dataset接口组成,分为训练输入函数(train_input_fn),测试输入函数(eval_input_fn),分别用于输入数据和训练数据,输入验证数据,测试数据。
- 模型函数,由模型 tf.layers接口,监控模块 tf.metrics 接口组成,用来实现训练模型,测试模型,监控模型参数状况等。
- 估算器模型,模型的注入,正向,输出,评估,测试等各个部分组合。控制数据在模型中的流动和变换,控制模型的各种运算。
- 预置的模型,除了支持自定义,还封装了一些常用模型,比如线性回归,分类模型,基于神经网络的回归,分类模型(DNNRegressor,DNNClassifier)。
- 基于估算器开发的高级模型,TFTS,用于处理序列数据的通用框架,TF-GAN,处理GAN的框架。
- 优缺点:对模型的训练,使用高度集成,适用于成熟的模型,使工程简洁,但是无法精确控制某个具体环节,不利于开发新的模型。
# -*- coding: utf-8 -*-
"""
@author: 代码医生工作室
@公众号:xiangyuejiqiren (内有更多优秀文章及学习资料)
@来源: <TensorFlow项目实战2.x>配套代码
@配套代码技术支持:bbs.aianaconda.com
"""
import tensorflow as tf
import numpy as np
tf.compat.v1.disable_v2_behavior()
#在内存中生成模拟数据
def GenerateData(datasize = 100 ):
train_X = np.linspace(-1, 1, datasize) #train_X为-1到1之间连续的100个浮点数
train_Y = 2 * train_X + np.random.randn(*train_X.shape) * 0.3 # y=2x,但是加入了噪声
return train_X, train_Y #以生成器的方式返回
train_data = GenerateData()
test_data = GenerateData(20)
batch_size=10
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO) # 设置日志级别
def train_input_fn(train_data, batch_size): #定义训练数据集输入函数
#构造数据集的组成:一个特征输入,一个标签输入
dataset = tf.data.Dataset.from_tensor_slices( ( train_data[0],train_data[1]) ) # 将给定的元组,列表等数据特征切片
dataset = dataset.shuffle(1000).repeat().batch(batch_size) #将数据集乱序、重复、批次划分.
return dataset #返回数据集
def eval_input_fn(data,labels, batch_size): #定义测试或应用模型时,数据集的输入函数
#batch不允许为空
assert batch_size is not None, "batch_size must not be None"
if labels is None: #如果评估,则没有标签
inputs = data
else:
inputs = (data,labels)
#构造数据集
dataset = tf.data.Dataset.from_tensor_slices(inputs)
dataset = dataset.batch(batch_size) #按批次划分
return dataset #返回数据集
def my_model(features, labels, mode, params):#自定义模型函数:参数是固定的。一个特征,一个标签,后两个参数可选
#定义网络结构
W = tf.Variable(tf.random.normal([1]), name="weight")
b = tf.Variable(tf.zeros([1]), name="bias")
# 前向结构
predictions = tf.multiply(tf.cast(features,dtype = tf.float32), W)+ b # wx+b
# 根据标志区分,训练,预测,评估,需要的参数不一样
if mode == tf.estimator.ModeKeys.PREDICT: # 预测处理
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
#定义损失函数
loss = tf.compat.v1.losses.mean_squared_error(labels=labels, predictions=predictions)
meanloss = tf.compat.v1.metrics.mean(loss) # 添加评估输出项,均值
metrics = {'meanloss':meanloss}
if mode == tf.estimator.ModeKeys.EVAL: # 测试处理
return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops=metrics)
# eval_metric_ops让模型评估时多一个指标,参数通过metrics函数创建,返回元组行对象。
#return tf.estimator.EstimatorSpec( mode, loss=loss)
#训练处理.
assert mode == tf.estimator.ModeKeys.TRAIN
optimizer = tf.compat.v1.train.AdagradOptimizer(learning_rate=params['learning_rate'])
train_op = optimizer.minimize(loss, global_step=tf.compat.v1.train.get_global_step())
# global_step,就是global_step作为name的tensor,也可以传入普通的变量,每次运行这个方法值自动加1 .
# minimize() 优化函数,包含两个步骤,1. 计算loss的梯度 2. 用得到的梯度更新权重。
# 等价于 optimizer.compute_gradients(loss) + optimizer.apply_gradients()
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
tf.compat.v1.reset_default_graph() #清空图
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO) #能够控制输出信息 ,
# 默认,估算器会占满显存
gpu_options = tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.8) #构建gpu_options,防止显存占满,最多占 80%
session_config=tf.compat.v1.ConfigProto(gpu_options=gpu_options) # 指定硬件运算的变量
#构建估算器
estimator = tf.estimator.Estimator( model_fn=my_model,model_dir='.\myestimatormode',params={'learning_rate': 0.1},
config=tf.estimator.RunConfig(session_config=session_config) )
'''
定义估算器模型,参数:model_fn: 模型函数 ,名字可以随便,但是函数的参数,返回值的类型必须固定(tf.estimator.Estimator),模式不同返回的对象带的值也不同。
model_dir 训练时保存模型参数,图等的地址
config,配置类,指定运行时的附加条件。
params,超参数字典,传递到model_fn;
warm_start_from: 可选,字符串,检查点的文件路径,用来指示从哪里开始热启动。读取节点权重,用于微调模型
或者是 tf.estimator.WarmStartSettings 类来全部配置热启动。
config=tf.estimator.RunConfig 配置训练模型,可选的参数很多,可以去看看源码。
大概可以:指定模型的目录(优先级更高),初始化的随机数种子,保存summary的频率,生产检查点文件的时间频率,步数频率,ConfigProto的配置,保留检查点文件的个数,生产检查点文件的频率,
'''
#匿名输入方式,训练模型
estimator.train(lambda: train_input_fn(train_data, batch_size),steps=200)
'''
train(self, input_fn, hooks=None, steps=None, max_steps=None, saving_listeners=None)
输入函数,要求没有参数,就用匿名函数在原有的函数上包一层,还可以通过偏函数,装饰器的方式传入。(见下方)
hooks:SessionRunHook 子类实例的列表。用于在训练循环内部执行。
steps,训练的步数,None一直训练。递进式的,运行两次语句(steps=200)就运行400次,max_steps=None
max_steps ,最大步数,运行(max_steps=100),运行两次语句,第一次100次后第二次就不会执行了。
saving_listeners:CheckpointSaverListener 对象的列表。
'''
tf.compat.v1.logging.info("训练完成.")#输出训练完成
##偏函数方式
#from functools import partial
#estimator.train(input_fn=partial(train_input_fn, train_data=train_data, batch_size=batch_size),steps=2)
#
##装饰器方式
#def checkParams(fn): #定义通用参数装饰器函数
# def wrapper(): #使用字典和元组的解包参数来作形参
# return fn(train_data=train_data, batch_size=batch_size) #如满足条件,则将参数透传给原函数,并返回
# return wrapper
#
#@checkParams
#def train_input_fn2(train_data, batch_size): #定义训练数据集输入函数
# #构造数据集的组成:一个特征输入,一个标签输入
# dataset = tf.data.Dataset.from_tensor_slices( ( train_data[0],train_data[1]) )
# dataset = dataset.shuffle(1000).repeat().batch(batch_size) #将数据集乱序、重复、批次划分.
# return dataset #返回数据集
#estimator.train(input_fn=train_input_fn2, steps=2)
#
#tf.logging.info("训练完成.")#输出训练完成
#热启动,经过以上的训练已经保存了模型,这里读入进行微调。
warm_start_from = tf.estimator.WarmStartSettings(
ckpt_to_initialize_from='.\myestimatormode',
)
'''
参数:'ckpt_to_initialize_from', 加载模型的路径,将其中的值赋值给当前模型指定的权重
'vars_to_warm_start', 指定模型文件种那些变量赋值给当前,可以是一个张量列表或张量名,或一个正则
'var_name_to_vocab_info',
'var_name_to_prev_var_name',
'''
#重新定义带有热启动的估算器
estimator2 = tf.estimator.Estimator( model_fn=my_model,model_dir='.\myestimatormode3',warm_start_from=warm_start_from,params={'learning_rate': 0.1},
config=tf.estimator.RunConfig(session_config=session_config) )
estimator2.train(lambda: train_input_fn(train_data, batch_size),steps=200)
# 测试估算器模型,可以直接使用estimator.evaluate,这里使用的可以直接把numpy变量的数据包装成一个输入函数返回。
test_input_fn = tf.compat.v1.estimator.inputs.numpy_input_fn(test_data[0],test_data[1],batch_size=1,shuffle=False)
train_metrics = estimator.evaluate(input_fn=test_input_fn)
#train_metrics = estimator2.evaluate(input_fn=lambda: eval_input_fn(train_data[0],train_data[1],batch_size))
#
print("train_metrics",train_metrics) # 返回评估结果,包含了我们加入的meanloss。
# 预测
predictions = estimator.predict(input_fn=lambda: eval_input_fn(test_data[0],None,batch_size))
print("predictions",list(predictions)) # 返回一个生成器的类型
new_samples = np.array( [6.4, 3.2, 4.5, 1.5], dtype=np.float32) #定义输入
predict_input_fn = tf.compat.v1.estimator.inputs.numpy_input_fn( new_samples,num_epochs=1, batch_size=1,shuffle=False)
predictions = list(estimator.predict(input_fn=predict_input_fn))
print( "输入, 结果: {} {}n".format(new_samples,predictions))
用估算器框架实现分布式部署训练
一个任务拆分成多个小任务,只需要修改TF_CONFIG 环境变量,就可以实现分布式部署种不同的角色协同合作。
修改模型使其支持分布式
.....
#构建估算器
estimator = tf.estimator.Estimator( model_fn=my_model,model_dir='myestimatormode',
params={'learning_rate': 0.1},
config=tf.estimator.RunConfig(session_config=session_config) )
train_spec = tf.estimator.TrainSpec(input_fn=lambda: train_input_fn(train_data, batch_size),
max_steps=1000)
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: eval_input_fn(test_data,None, batch_size))
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
这个函数通过所给的estimator进行训练,评估,导出模型(可选),所有有关训练的参数通过TrainSpec指定 input_fn, maxsteps。评估也是通过EvalSpec。
给本地(非分布式)和分布式配置提供一致的行为。默认的分布配置是在服务器上参数的图间复制
通过TF_COINFIG变量进行分布式配置
添加TF_CONFIG 变量有两个方法:
- 将TF_CONFIG 变量添加到环境变量中
- 程序运行前加入 定义,比如在命令行中输入
- 变量内容格式:
一个字符串,描述分布式训练的各个角色(chief,worker,ps) 的信息,每个角色由task里面的type指定
chief角色,分布式训练的主计算节点,处理训练还需要管理其他工作,比如检查点文件保存,恢复,编写摘要等。
TF_CONFIG='{
"cluster": {
"chief": ["host0:2222"],
"worker": ["host1:2222", "host2:2222", "host3:2222"],
"ps": ["host4:2222", "host5:2222"]
},
"task": {"type": "chief", "index": 0}
}'
一般计算节点,index,指定对应worker索引 0,1,2
TF_CONFIG='{
"cluster": {
"chief": ["host0:2222"],
"worker": ["host1:2222", "host2:2222", "host3:2222"],
"ps": ["host4:2222", "host5:2222"]
},
"task": {"type": "worker", "index": 0}
}'
ps,用于分布式的服务端,
TF_CONFIG='{
"cluster": {
"chief": ["host0:2222"],
"worker": ["host1:2222", "host2:2222", "host3:2222"],
"ps": ["host4:2222", "host5:2222"]
},
"task": {"type": "ps", "index": 0}
}'
这里3种角色放一台机子上试试
- 将TF_CONFIG 放到环境变量
- 代码复制3份,代表三个角色
# ps 角色
TF_CONFIG='''{
"cluster": {
"chief": ["127.0.0.1:2221"],
"worker": ["127.0.0.1:2222"],
"ps": ["127.0.0.1:2223"]
},
"task": {"type": "ps", "index": 0}
}'''
import os
os.environ['TF_CONFIG']=TF_CONFIG
print(os.environ.get('TF_CONFIG'))
'''
省略,代码同上
'''
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
# chief 角色
TF_CONFIG='''{
"cluster": {
"chief": ["127.0.0.1:2221"],
"worker": ["127.0.0.1:2222"],
"ps": ["127.0.0.1:2223"]
},
"task": {"type": "chief", "index": 0}
}''' # worker 同理,改一下就行
import os
os.environ['TF_CONFIG']=TF_CONFIG
print(os.environ.get('TF_CONFIG'))
打开三个控制台,到对应目录下,直接运行 python ps.py 。。,会看到执行 ps.py 后,提示等待chief worker 的接入,。chief,在训练完后保存模型,会显示 loss, step 和每一步的时间。
worker 只负责训练,并且可以看到step 步数是不连续的,与chief交叉进行。
自己试试就知道了,hhhh。
分布式策略,KubeFlow 框架进行分布式部署
最后
以上就是优雅板栗为你收集整理的Tensorflow2从入门到入土:estimators,使用估算器实现分布式部署,的全部内容,希望文章能够帮你解决Tensorflow2从入门到入土:estimators,使用估算器实现分布式部署,所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复