我是靠谱客的博主 无情小虾米,最近开发中收集的这篇文章主要介绍文章排序-pyspark FTRL模型(四),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

构建TFRecords文件

TFRecords其实是一种二进制文件,虽然它不如其他格式好理解,但是它能更好的利用内存,更方便复制和移动,并且不需要单独的标签文件。

import tensorflow as tf
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession 
.builder 
.appName("user") 
.config("spark.some.config.option", "some-value") 
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
train = spark.read.parquet('datas/train')
train = train.rdd.map(lambda x:(x.features,x.clicked)).collect()
train = pd.DataFrame(train)
print(train)
def write_to_tfrecords(feature_batch, click_batch):
"""将用户与文章的点击日志构造的样本写入TFRecords文件
"""
# 1、构造tfrecords的存储实例
writer = tf.python_io.TFRecordWriter("datas/train_ctr_20190605.tfrecords")
# 2、循环将所有样本一个个封装成example,写入文件
for i in range(len(click_batch)):
# 取出第i个样本的特征值和目标值,格式转换
click = click_batch[i]
feature = feature_batch[i].tostring()
# 构造example
example = tf.train.Example(features=tf.train.Features(feature={
"feature": tf.train.Feature(bytes_list=tf.train.BytesList(value=[feature])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[click]))
}))
# 序列化example,写入文件
writer.write(example.SerializeToString())
writer.close()
with tf.Session() as sess:
# 创建线程协调器
coord = tf.train.Coordinator()
# 开启子线程去读取数据
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 存入数据
write_to_tfrecords(train.iloc[:, 0], train.iloc[:, 1])
# 关闭子线程,回收
coord.request_stop()
coord.join(threads)

FTRL模型

ftrl模型是LR以tfrl方式优化的模型,添加了L1正则和L2正则。

一、tensorflow方式实现
使用tf.estimator.LinearClassifier构建ftrl模型,并将模型存储两种方式任意一种。
TensorFlow的模型格式有很多种,针对不同场景可以使用不同的格式,只要符合规范的模型都可以轻易部署到在线服务或移动设备上,这里简单列举一下。
Checkpoint: 用于保存模型的权重,主要用于模型训练过程中参数的备份和模型训练热启动。
SavedModel:使用saved_model接口导出的模型文件,包含模型Graph和权限可直接用于上线,TensorFlowestimator和Keras模型推荐使用这种模型格式。

import tensorflow as tf
# - 1、构建TFRecords的输入数据
# - 2、使用模型进行特征列指定
# - 3、模型训练以及预估
FEATURE_COLUMN = ['channel_id', 'vector', 'user_weights', 'article_weights']
class LrWithFtrl(object):
"""LR以FTRL方式训练
"""
def __init__(self):
pass
@staticmethod
def get_tfrecords_data():
def parse_example_function(exmaple):
"""解析每个样本的example
:param exmaple:
:return:
"""
# 定义解析格式,parse_single_example
features = {
'label': tf.FixedLenFeature([], tf.int64),
'feature': tf.FixedLenFeature([], tf.string)
}
label_feature = tf.parse_single_example(exmaple, features)
# 修改其中的特征类型和形状
# 解码 [121]
# feature = tf.reshape(tf.decode_raw(label_feature['feature'], tf.float32), [1, 121])
f = tf.decode_raw(label_feature['feature'], tf.float64)
feature = tf.reshape(tf.cast(f, tf.float32), [1, 121])
# 计算其中向量、用户权重、文章权重的平均值
channel_id = tf.cast(tf.slice(feature, [0, 0], [1, 1]), tf.int32)
vector = tf.reduce_sum(tf.slice(feature, [0, 1], [1, 100]), axis=1)
user_weights = tf.reduce_sum(tf.slice(feature, [0, 101], [1, 10]), axis=1)
article_weights = tf.reduce_sum(tf.slice(feature, [0, 111], [1, 10]), axis=1)
# 4个特征值进行名称构造字典
data = [channel_id, vector, user_weights, article_weights]
feature_dict = dict(zip(FEATURE_COLUMN, data))
label = tf.cast(label_feature['label'], tf.int32)
return feature_dict, label
# Tfrecord dataset读取数据
dataset = tf.data.TFRecordDataset(['datas/train_ctr_20190605.tfrecords'])
# map 解析
dataset = dataset.map(parse_example_function)
dataset = dataset.batch(64)
dataset = dataset.repeat(10)
return dataset
def train_eval(self):
"""
进行训练
:return:
"""
# 指定列特征
channel_id = tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25)
vector = tf.feature_column.numeric_column('vector')
user_weights = tf.feature_column.numeric_column('user_weights')
article_weights = tf.feature_column.numeric_column('article_weights')
columns = [channel_id, vector, user_weights, article_weights]
# LinearClassifier
model = tf.estimator.LinearClassifier(feature_columns=columns,
optimizer=tf.train.FtrlOptimizer(learning_rate=0.1,
l1_regularization_strength=10,
l2_regularization_strength=10),
model_dir='../models/ckpt/lr_ftrl')
model.train(LrWithFtrl.get_tfrecords_data, steps=100)
result = model.evaluate(LrWithFtrl.get_tfrecords_data)
print(result)
# 模型导入
feature_spec = tf.feature_column.make_parse_example_spec(columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
model.export_savedmodel("./serving_model/wdl1/", serving_input_receiver_fn)
def predict(self, inputs):
# 指定列特征
channel_id = tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25)
vector = tf.feature_column.numeric_column('vector')
user_weights = tf.feature_column.numeric_column('user_weights')
article_weights = tf.feature_column.numeric_column('article_weights')
columns = [channel_id, vector, user_weights, article_weights]
# LinearClassifier
model = tf.estimator.LinearClassifier(feature_columns=columns,
optimizer=tf.train.FtrlOptimizer(learning_rate=0.1,
l1_regularization_strength=10,
l2_regularization_strength=10),
model_dir='../models/ckpt/lr_ftrl')
predictions = model.predict(inputs, checkpoint_path='../models/ckpt/lr_ftrl')
return predictions
if __name__ == '__main__':
lw =
LrWithFtrl()
print(lw.get_tfrecords_data())
model = lw.train_eval()
# {'accuracy': 0.9046435, 'accuracy_baseline': 0.9046434, 'auc': 0.57956487, 'auc_precision_recall': 0.12670927, 'average_loss': 0.31273547, 'label/mean': 0.095356554, 'loss': 19.850473, 'precision': 0.0, 'prediction/mean': 0.111656144, 'recall': 0.0, 'global_step': 100}

二、keras实现

import tensorflow as tf
from tensorflow.python import keras
class LrWithFtrl(object):
"""LR以FTRL方式优化
"""
def __init__(self):
self.model = keras.Sequential([
keras.layers.Dense(1, activation='sigmoid', input_shape=(121,))
])
@staticmethod
def read_ctr_records():
# 定义转换函数,输入时序列化的
def parse_tfrecords_function(example_proto):
features = {
"label": tf.FixedLenFeature([], tf.int64),
"feature": tf.FixedLenFeature([], tf.string)
}
parsed_features = tf.parse_single_example(example_proto, features)
feature = tf.decode_raw(parsed_features['feature'], tf.float64)
feature = tf.reshape(tf.cast(feature, tf.float32), [1, 121])
label = tf.reshape(tf.cast(parsed_features['label'], tf.float32), [1, 1])
return feature, label
dataset = tf.data.TFRecordDataset(["datas/train_ctr_20190605.tfrecords"])
dataset = dataset.map(parse_tfrecords_function)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(10000)
return dataset
def train(self, dataset):
self.model.compile(optimizer=tf.train.FtrlOptimizer(0.03, l1_regularization_strength=0.01,
l2_regularization_strength=0.01),
loss='binary_crossentropy',
metrics=['binary_accuracy'])
self.model.fit(dataset, steps_per_epoch=10000, epochs=10)
self.model.summary()
self.model.save_weights('../models/ckpt/ctr_lr_ftrl.h5')
def predict(self, inputs):
"""预测
:return:
"""
# 首先加载模型
self.model.load_weights('../models/ckpt/ctr_lr_ftrl.h5')
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
predictions = self.model.predict(sess.run(inputs))
return predictions
if __name__ == '__main__':
lwf = LrWithFtrl()
dataset = lwf.read_ctr_records()
lwf.train(dataset)
# 10000/10000 [==============================] - 6s 592us/step - loss: 0.3075 - binary_accuracy: 0.9053
inputs, labels = dataset.make_one_shot_iterator().get_next()
print(inputs, labels)
predictions = lwf.predict(inputs)
print(predictions)

三、在线预测

# coding=utf-8
import numpy as np
import tensorflow as tf
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession 
.builder 
.appName("user") 
.config("spark.some.config.option", "some-value") 
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
from keras_ftrl import LrWithFtrl
# 临时预测数据
user_id = 1112727762809913344
channel_id = 17
reco_set = [104454, 11078, 14335, 4402, 2, 14839, 44024, 18427, 43997, 17375]
def feature(row):
user_features = row.user_features[int(channel_id)]
return user_features
def vector_list(row):
return list(row.article_features)
user_feature = spark.read.parquet(r'datasuser_features')
article_feature = spark.read.parquet(r'datasarticle_features')
def lrftrl_sort_service(reco_set, user_feature, article_feature):
"""
排序返回推荐文章
:param reco_set:召回合并过滤后的结果
:param temp: 参数
:param hbu: Hbase工具
:return:
"""
print(344565)
# 排序
# 1、读取用户特征中心特征
user_feature = user_feature.filter(user_feature.user_id == user_id).select(user_feature.user_features).rdd.map(
feature).collect()
from itertools import chain
user_feature = list(chain.from_iterable(user_feature))
print(user_feature)
if user_feature and reco_set:
# 2、读取文章特征中心特征
result = []
for article_id in reco_set:
try:
article_feature = article_feature.filter(article_feature.article_id == str(article_id)).rdd.map(vector_list).collect()
from itertools import chain
article_feature = list(chain.from_iterable(article_feature))
except Exception as e:
article_feature = []
if not article_feature:
article_feature = [0.0] * 111
f = []
f.extend(user_feature)
f.extend(article_feature)
result.append(f)
if result:
# 4、预测并进行排序筛选
arr = np.array(result)
print(arr)
# 加载逻辑回归模型
lwf = LrWithFtrl()
print(tf.convert_to_tensor(np.reshape(arr, [len(reco_set), 121])))
predictions = lwf.predict(tf.constant(arr))
print(predictions)
df = pd.DataFrame(np.concatenate((np.array(reco_set).reshape(len(reco_set), 1), predictions),
axis=1),
columns=['article_id', 'prob'])
df_sort = df.sort_values(by=['prob'], ascending=True)
# 排序后,只将排名在前100个文章ID返回给用户推荐
if len(df_sort) > 100:
reco_set = list(df_sort.iloc[:100, 0])
else:
reco_set = list(df_sort.iloc[:, 0])
print(reco_set)
return reco_set
if __name__ == '__main__':
reco_set = lrftrl_sort_service(reco_set, user_feature, article_feature)
print(reco_set)

最后

以上就是无情小虾米为你收集整理的文章排序-pyspark FTRL模型(四)的全部内容,希望文章能够帮你解决文章排序-pyspark FTRL模型(四)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部