概述
文章目录
- 1、项目背景
- 2、项目架构
- 3、数据描述
- 4、代码实现
1、项目背景
- 通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等。
- 需求:按时间统计每人在每分钟,每小时,每年的通话次数和通话时长。
2、项目架构
- 本项目的数据是由我们通过java类产生的,然后存放在服务器的某个路径下,同时启动了Flume,Kafka,HBase和Hadoop集群。当检测到某路径下产生数据时,Flume集群则将数据发送到Kafka,然后Kafka持续地将数据以某种格式写进HBase,最后我们利用Mapreduce统计分析HBase里面的数据,将结果写入mysql。
- 技术流程图如下:
- 项目目录结构说明:共分为4个模块。
1)ct_producer:用于数据的生产,由于不是在真实环境下,故需要我们自己制造数据,而在实际的生产环境中,是不需要此模块的。
2)ct_consumer:用于数据的消费,只有当kafka里面有数据,则把它写进hbase,直到kafka里数据为空。
3)ct_analysis:数据分析模块,利用mapreduce根据业务分析每人的通话时长和通话次数,然后将结果存进mysql。
4)ct_web_boot:用于数据的可视化,暂时还没有实现,读者感兴趣的,可以研究一下。
3、数据描述
-
数据生产阶段:我们使用了ProductLog类模拟生产数据,在该类中启动一个线程每隔5秒产生一行数据,便将它写入到calllog.csv文件。数据格式如下:
caller(主叫号码), callee(被叫号码), buildTime(通话建立时间), duration(通话时长/秒) 18468618874,18549641558,2018-04-06 20:49:45,0142 14575535933,18468618874,2018-11-16 20:04:33,0977 18549641558,18468618874,2018-02-28 02:56:29,1571 13980337439,17269452013,2018-03-20 20:14:55,0171
-
数据消费阶段:将Kafka的数据写入HBase,其中,hbase表数据结构如下:
(1)rowkey中关于字段hashregion设计思想:这里我们有3个RegionServer,一般情况下一个region维护1~10G的数据量,我们可以评估一下自己的数据量,一般100百万条数据在50到100M左右(这里我们取中间数70M吧)。假设一个region维护1G的数据量(相当于1千4百万条数据),这时公司里有几百亿条数据,我们可以依此推断出需要多少个分区(实际中会多预留出一个分区,这个分区不存放数据,而是当程序发生异常时,存放异常数据。)。在本项目中,由于数据量较小,我们设计了6个分区,依次为00,01,02,03,04,05 (分区号生成策略代码有详细介绍)。
(2)RegionServer:一个RegionServer部署在一台服务器上,RegionServer是HBase集群运行在每个工作节点上的服务。它是整个HBase系统的关键所在,一方面它维护了Region的状态,提供了对于Region的管理和服务;另一方面,它与Master交互,参与Master的分布式协调管理。名称 说明 rowkey hashregion_call1_datetime_call2_flag_duration 01_15837312345_20170527081033_13766889900_1_0180 family f1 列族:存放主叫信息;f2 列族:存放被叫信息 call1 第一个手机号码 call2 第二个手机号码 date_time 通话建立的时间,例如:20171017081520 date_time_ts date_time 对应的时间戳形式 flag 标记 call1 是主叫还是被叫 duration 通话时长(单位:秒) -
数据分析阶段:这里我们一共设计三张表,分别是表db_telecom.tb_contacts,表db_telecom.tb_call和表db_telecom.tb_dimension_date。
1)表db_telecom.tb_contacts
列 说明 id 自增主键 telephone 手机号码 name 联系人姓名 2)表db_telecom.tb_call
列 说明 id_date_contact 复合主键(联系人维度 id, 时间维度 id) id_date_dimension 时间维度 id id_contact 查询人的电话号码 call_sum 通话次数总和 call_duration_sum 通话时长总和 3)表db_telecom.tb_dimension_date
列 说明 id 自增主键 year 年,当前通话信息所在年 month 月,当前通话信息所在月,如果按照年来统计信息,则month 为-1。 day 日,当前通话信息所在日,如果按照月来统计信息,则day 为-1。
4、代码实现
-
数据生产阶段:
1) 随机输入一些手机号码以及联系人,保存于Java的集合中。// 生产数据 // 用于存放待随机的电话号码 private List<String> phoneList = new ArrayList<>(); private Map<String, String> phoneNameMap = new HashMap<>(); public void initPhone() { // 由于初始化的字段太多,故只列出部分字段。。。 phoneList.add("17078388295"); phoneList.add("13980337439"); phoneList.add("14575535933"); phoneNameMap.put("14575535933", "仰莉"); phoneNameMap.put("19902496992", "陶欣悦"); phoneNameMap.put("18549641558", "施梅梅"); .... }
2)创建随机生成通话时间的方法:randomBuildTime。该时间生成后的格式为 yyyy- MM-dd HH:mm:ss,并可以根据传入的起始时间和结束时间来随机生成。
public String randomBuildTime(String startTime, String endTime) { try { SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); Date startDate = sdf1.parse(startTime); Date endDate = sdf1.parse(endTime); if (endDate.getTime() <= startDate.getTime()) return null; long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random()); Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate); return resultTimeString; } catch (ParseException e) { e.printStackTrace(); } return null; }
3)随机抽取两个电话号码,随机产生通话建立时间,随机通话时长,将这几个字段拼接成一个字符串,然后 return,便可以产生一条通话的记录。需要注意的是,如果随机出的两个电话号码一样,需要重新随机(随机过程可优化,但并非此次重点)。通话时长的随机为 30 分钟以内,即:60 秒 * 30,并格式化为 4 位数字,例如:0600(10 分钟)。
public String product() { String caller = null; String callee = null; String callerName = null; String calleeName = null; //取得主叫电话号码 int callerIndex = (int) (Math.random() * phoneList.size()); caller = phoneList.get(callerIndex); callerName = phoneNameMap.get(caller); while (true) { //取得被叫电话号码 int calleeIndex = (int) (Math.random() * phoneList.size()); callee = phoneList.get(calleeIndex); calleeName = phoneNameMap.get(callee); if (!caller.equals(callee)) break; } String buildTime = randomBuildTime(startTime, endTime); //0000 DecimalFormat df = new DecimalFormat("0000"); String duration = df.format((int) (30 * 60 * Math.random())); StringBuilder sb = new StringBuilder(); sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration); return sb.toString(); }
4)创建写入日志方法:writeLog。productLog 每产生一条日志,便将日志写入到本地文件中,所以建立一个专门用于日志写入的方法,需要涉及到 IO 操作,需要注意的是,输出流每次写一条日之后需要 flush,不然可能导致积攒多条数据才输出一次。最后需要将productLog方法放置于while死循环中执行。
public void writeLog(String filePath) { try { OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath), "UTF-8"); while (true) { Thread.sleep(500); String log = product(); System.out.println(log); osw.write(log + "n"); //一定要手动flush才可以确保每条数据都写入到文件一次 osw.flush(); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e2) { e2.printStackTrace(); } }
5)在主函数中初始化以上逻辑,并测试。
public static void main(String[] args) throws InterruptedException { // 打包测试 cd F:1-codectct_producertarget // 执行java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog //String logPath = "D:\calllog.csv"; ProductLog productLog = new ProductLog(); productLog.initPhone(); productLog.writeLog(args[0]); //productLog.writeLog(logPath); }
6)打包测试:将ProductLog类打包,然后放在服务器某个路径下,执行java命令:java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /data/callog.csv。至此数据的生产阶段完成了。
-
数据采集阶段:采集产生的数据到 kafka 集群。此步骤较简单,故不详细列出。
1) 配置 kafka,启动 zookeeper 和 kafka 集群;
2) 创建 kafka 主题;
3) 启动 kafka 控制台消费者(此消费者只用于测试使用);
4) 配置 flume,监控日志文件;
5) 启动 flume 监控任务;
6)运行日志生产脚本;
7)观察测试。 -
数据消费阶段和分析阶段请参考下一篇博文。
https://blog.csdn.net/u013337425/article/details/89300760
最后
以上就是生动毛豆为你收集整理的基于Flume+Kafka+HBase+Mapreduce的电信客服项目(上)的全部内容,希望文章能够帮你解决基于Flume+Kafka+HBase+Mapreduce的电信客服项目(上)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复