我是靠谱客的博主 跳跃蜜蜂,最近开发中收集的这篇文章主要介绍并行转换csv文件为libffm格式并行转换csv文件为libffm格式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

并行转换csv文件为libffm格式

  在使用libffm或者xlearn时,需要数据文件是libffm格式的,而我们一般拿到的数据都是csv格式或类csv格式的,所以需要转换。最简单的做法是把csv一行一行地读入,然后对每个特征进行编码处理,然后拼接成完整的一行,但是这样做是在是太慢了。本文参考这里,提取出最核心的部分,给出一种并行转换libffm文件的方法,然后对并行后节省的时间进行评估。

模块结构

  整个流程是这样的,首先将源csv文件切分成若干个临时文件,为每个文件分配一个子进程,各文件分别进行转换,然后将转换好的各个子文件拼接起来,最后删除生成的临时文件。


文件切分

  并行的第一步是要做文件切分,将源文件切成若干个子文件,子文件的数量取决于你准备用多少个子进程,例如我设置nr_thread=16,就意味着.csv文件将被划分成16个小的临时文件。
  划分时有一点需要注意,每个子文件都应该先写入一行header

def split_csv(path, nr_thread, has_header):
    # 计算每个子文件的数据行数
    def calc_nr_lines_per_thread():
        # 调用linux的wc命令获取文件行数
        nr_lines = int(list(subprocess.Popen('sudo wc -l {0}'.format(path), shell=True, stdout=subprocess.PIPE).stdout)[0].split()[0])
        if not has_header:
            nr_lines += 1 # wc命令会自动跳过第一行不算
        return nr_lines/nr_thread

    # 打开子文件,并写入header
    def open_with_header_written(path, idx, header):
        f = open(path+'.__tmp__.{0}'.format(idx), 'w')
        if not has_header:
            return f
        f.write(header)
        return f

    # 打开文件且跳过header
    def open_with_first_line_skipped(path, skip=True):
        f = open(path)
        if not skip:
            return f
        next(f)
        return f

    header = open(path).readline()

    nr_lines_per_thread = calc_nr_lines_per_thread()

    # 遍历源文件写入各子文件
    idx = 0
    f = open_with_header_written(path, idx, header)
    for i, line in enumerate(open_with_first_line_skipped(path, has_header), start=1):
        f.write(line)
        if i % nr_lines_per_thread == 0:
            if idx < nr_thread - 1:
                f.close()
                idx += 1
                f = open_with_header_written(path, idx, header)
    f.close()

分配子进程

def parallel_convert(converter, arg_paths, nr_thread):
    workers = []
    # 对每个子进程
    for i in range(nr_thread):
        # 执行convert命令
        cmd = "python " + converter
        for path in arg_paths:
            cmd += ' {0}'.format(path+".__tmp__.{0}".format(i)) 
        worker = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
        workers.append(worker)
    # 配置通信
    for worker in workers:
        worker.communicate()

转化函数

# 哈希函数
def hashstr(S, nr_bins):
    return str(int(hashlib.md5(S.encode('utf-8')).hexdigest(), 16)%(nr_bins-1) + 1)

with open(args['ffmfile'], 'w') as f:
    for row in csv.DictReader(open(args['csvfile'])):
        row_to_write = [row['label'], ] # label
        field = 0
        for feat in row.keys():
            if feat == 'label':
                continue
            items = str(row[feat]).split(" ")
            for item in items:
                # feat_index1:1  feat_index2:1 ...
                row_to_write.append(":".join([str(field), hashstr(str(field)+'='+item, args['bins']), '1'])) 
            field += 1
        # write a row
        row_to_write = " ".join(row_to_write)
        f.write(row_to_write + 'n')

子文件合并

def cat(path, nr_thread):
    # 清空目标文件
    if os.path.exists(path):
        os.remove(path)
    # 调用cat命令,合并全部子文件
    for i in range(nr_thread):
        cmd = 'sudo cat {0}.__tmp__.{1} >> {0}'.format(path, i)
        p = subprocess.Popen(cmd, shell=True)
        p.communicate()

删除子文件

def delete(path, nr_thread):
    for i in range(nr_thread):
        os.remove('{0}.__tmp__.{1}'.format(path, i))

并行性能评估

  这样并行真的能节省时间吗?下面这个数据文件是我伪造的,它有20个样本,包含三个field,囊括了单值离散型和多值离散型field。利用这个数据集我设计了一个实验,简单地用np.tile构造更多的样本,分别使用并行(nr_thread=16)和非并行的方式处理文件,生成libffm格式,对比二者时间消耗。
  如下一图所示,横轴是数据量,纵轴是时间消耗,当数据量较少时,串行处理比并行处理要快,当数据量较大时,并行的优势就显现出来了。两条曲线都近似于直线,所以我接着用leastsq分别进行拟合,得到两条直线,如下二图所示。
  然后将1亿(数据集大小)代入,在处理1亿数据量,预测串行需要1295.9秒(合21.6分钟),并行需要253.3秒(合4.2分钟),串行所需的时间是并行的5倍。

label,id,gender,interest
0,1,0,10 20
0,2,0,20 30
0,3,1,30 40
0,4,1,40
1,5,2,40 50
1,6,2,50 60
1,7,0,60 10
1,8,0,10
0,9,1,10 30
0,10,1,30 50
0,11,2,50 60
0,12,2,60
1,13,0,60 20
1,14,0,20 40
1,15,1,40 60
1,16,1,60
0,17,2,60 30
0,18,2,30 60
0,19,0,60 30
0,20,0,60




完整代码

【github】我的版本


参考资料

【github】原始版本
【博客】廖雪峰 多进程
【博客】最小二乘法及其python实现
【API】scipy.optimize.leastsq
【API】subprocess

最后

以上就是跳跃蜜蜂为你收集整理的并行转换csv文件为libffm格式并行转换csv文件为libffm格式的全部内容,希望文章能够帮你解决并行转换csv文件为libffm格式并行转换csv文件为libffm格式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部