我是靠谱客的博主 唠叨早晨,最近开发中收集的这篇文章主要介绍7.实例mapreduce:计算最高气温(hadoop-streaming python),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.目标

  • 使用mapreduce计算hadoop权威指南第四版计算最高气温;
  • 使用hadoop-streaming api;
  • 使用python3.5.4编写代码;

2.步骤

2.1.下载数据

  • 地址:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
  • 压缩包下载到本地
def get_data(remote="ftp://ftp.ncdc.noaa.gov/pub/data/gsod/", local="d:/data/"):
"""下载数据"""
if not os.path.exists(local):
os.makedirs(local)
start, end = 1929, 1950
for year in range(start, end + 1):
file = "gsod_{}.tar".format(year)
path = "{0}/{1}".format(year, file)
resp = request.urlretrieve("{0}{1}".format(remote, path), local + file)
print(resp)
  • 手动全部解压(每个压缩包解压为一个单独的文件夹);
  • 把每年的数据合并
def merge(year, dir="D:/data/gsod_1929", savedir="D:/data/1/"):
"""把包含的gz文件的内容合并为一个文本文件"""
files = os.listdir(dir)
with open('{0}{1}.txt'.format(savedir, year), 'w') as newfile:
for i, file in enumerate(files):
with gzip.open(dir + "/" + file, 'r') as pf:
for line in pf:
line = str(line)
if i and ("YEARMODA" in line):
continue
line = line[2:-3]
newfile.write(line + "n")
def merges(dir="D:/data/"):
"""获取所的目录"""
files = os.listdir(dir)
for file in files:
if file != '1':
merge(file[-4:], dir + file)

2.2.上传文件到hdfs

  • 使用python hdfs模块;
    • pip install hdfs;
  • 上传:PythonDemo -> hadoop -> myhdfs.py;
    • 注意:不要命名为hdfs.py,python不允许模块重名;
    # **hdfs操作
    import os
    from hdfs import client
    #**url:ip:端口
    client = client.InsecureClient("http://yh001:50070", user="hadoop" ,root="/")
    def upload(remote_dir="/pings/wordcount/", local_dir="D:/data/1/"):
    """上传本地文件到hdfs"""
    client.delete(remote_dir, True)
    client.makedirs(remote_dir)
    for file in os.listdir(local_dir):
    client.upload(remote_dir, local_dir + file)
    

2.3.mapreduce

  • 原理:使用hadoop streaming是利用hadoop流的API,stdin(标准输入)、stdout(标准输出)在map函数和reduce函数之间传递数据;
  • 实现:利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout;
2.3.1.map
  • 原始数据,根据空格切分字符串(line = re.split(r" *", line)),取 第下标为2和17的元素为年份和每日最高温度(年份取前4位,华氏温度转换成摄氏温度);
STN--- WBAN
YEARMODA
TEMP
DEWP
SLP
STP
VISIB
WDSP
MXSPD
GUST
MAX
MIN
PRCP
SNDP
FRSHTT
030050 99999
19291001
45.3
4
40.0
4
1001.6
4
9999.9
0
17.1
4
4.5
4
8.9
999.9
51.1
44.1*
0.00I 999.9
000000
030050 99999
19291002
49.5
4
45.2
4
977.6
4
9999.9
0
9.3
4
17.5
4
29.9
999.9
53.1*
44.1
99.99
999.9
010000
030050 99999
19291003
49.0
4
41.7
4
975.7
4
9999.9
0
10.9
4
10.0
4
23.9
999.9
53.1
46.0
99.99
999.9
010000
  • 代码
#!/usr/bin/python3
# **mapper
import sys
import re
def toC(qty):
"""华氏温度转换成摄氏温度(摄氏=5/9(°F-32))"""
return round(5 / 9 * (qty - 32), 2)
def max_temperature_mapper():
"""计算年份的最高温度"""
for line in sys.stdin:
line = re.split(r" *", line)
if len(line) > 17:
qty = line[17]
if qty != '9999.9':
qty = qty[:-1] if qty.endswith("*") else qty
print("{0}t{1}".format(line[2][:4], toC(float(qty))))
max_temperature_mapper()
2.3.2.reduce
  • reducer接收的数据,已经按年份排序(没有像java mapreduce程序按key合并);
#!/usr/bin/python3
# **reducer
import sys
from itertools import groupby
from operator import itemgetter
def read1():
"""测试数据源"""
datas = [
'1929 19.61',
'1929 19.11',
'1930 20.72',
'1930 24.61',
'1931 21.28'
]
for line in datas:
yield line.split(" ", 1)
def read2():
"""hadoop数据源"""
for line in sys.stdin:
yield line.split("t", 1)
def max_temperature_reducer():
"""计算年份的最高温度"""
for year, temperature_list in groupby(read2(), itemgetter(0)):
max = 0
for t in temperature_list:
if len(t) > 1:
temp = float(t[1])
max = temp if temp > max else max
print("{0}t{1}".format(year, max))
max_temperature_reducer()
# hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.1.jar -file max_temperature_mapper.py -mapper max_temperature_mapper.py -file max_temperature_reducer.py -reducer max_temperature_reducer.py -input /pings/1929.txt -output /pings/out/

4.运行结果

1929	31.11
1930	32.22
1931	42.39
1932	39.11
1933	43.5
1934	44.61
1935	47.78
1936	47.39
1937	46.11
1938	41.11
1939	43.28
1940	43.72
1941	46.22
1942	47.89
1943	48.5
1944	50.72
1945	49.61
1946	46.28
1947	49.0
1948	49.0
1949	48.89
1950	48.89

3.遇到的问题

  • 找不到文件或目录/没有执行权限/PipeMapRed.waitOutputThreads(): subprocess failed with code 126
    • 解决:
      • python文件头添加:#!/usr/bin/python3;
        • 集群每台机器都要安装python3;
        • 使用以上方式,一定要有/usr/bin/python3文件/符号链接(本人使用源码安装,添加了环境变量,但是实际没有/usr/bin/python3文件,导致一直出现PipeMapRed.waitOutputThreads(): subprocess failed with code 126);
      • 执行命令添加:-file max_temperature_mapper.py;
  • PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    • 原因:代码错误;
    • 解决:调试并修改代码;

4.源码

PythonDemo -> hadoop

最后

以上就是唠叨早晨为你收集整理的7.实例mapreduce:计算最高气温(hadoop-streaming python)的全部内容,希望文章能够帮你解决7.实例mapreduce:计算最高气温(hadoop-streaming python)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部