- 任务描述
对数据按照一定规则进行清洗。
清洗规则:
-
处理数据中的时间戳(秒级)将其转化为" 年-月-日 时:分:秒 "这种格式;
-
处理数据中的省份编码,结合
mysql
的表数据对应,将其转换成省份名称; -
处理用户手机号,与
mysql
的表数据对应,关联用户的真实姓名; -
处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
-
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为:
/user/test/input/a.txt (HDFS)
; 清洗后的数据存放于:/user/test/output (HDFS)
。
输出格式:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
- 代码实现
1.dbhelper.py文件
# dbhelper.py
import pymysql
import sys
import codecs
class DBHelper:
def get_connection():
# 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
######## Begin ############
conn = pymysql.connect(host='localhost',port=3306,
user='root',passwd='123123',
charset='utf8mb4',db='mydb')
######## End ############
return conn
@classmethod
def get_region(cls):
conn = cls.get_connection()
regions = dict()
with conn.cursor() as cur:
#从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
############ Begin ###################
cur.execute("select CodeNum,Address from allregion")
for s in cur.fetchall():
regions[s[0]] = s[1]
############ End #################
conn.close()
return regions
@classmethod
def get_userphones(cls):
conn = cls.get_connection()
userphones = dict()
with conn.cursor() as cur:
#从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
############ Begin ###################
cur.execute("select phone,trueName from userphone")
for t in cur.fetchall():
userphones[t[0]] = t[1]
############ End #################
conn.close()
return userphones
def main():
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
region = DBHelper.get_region()
users = DBHelper.get_userphones()
if __name__ == '__main__':
main()
2.mapper文件
#! /usr/bin/python3
#
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time
# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()
def main():
# 正确输出utf-8编码的汉字
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
for line in sys.stdin:
line = line.strip()
mapper(line)
def mapper(line):
# 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
# 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键t值”的形式。
########## begin ##############
items = line.split(',')
caller = userphones.get(items[0])
reciever = userphones.get(items[1])
begin_time = int(items[2])
end_time = int(items[3])
caller_address = regions.get(items[4])
reciever_address = regions.get(items[5])
print(caller,reciever,sep=',',end=',')
print(','.join(items[:2]),end=',')
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(begin_time)),end=',')
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(end_time)),end=',')
print(str(end_time - begin_time),end=',')
print(caller_address,reciever_address,sep=',')
########### End #################
if __name__ == '__main__':
main()
最后
以上就是耍酷豌豆为你收集整理的3.2 电信数据清洗的全部内容,希望文章能够帮你解决3.2 电信数据清洗所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复