我是靠谱客的博主 落寞茉莉,最近开发中收集的这篇文章主要介绍MapReduce---网约车撤销订单数据清洗数据及需求解析代码实现,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

MapReduce---网约车撤销订单数据清洗

  • 数据及需求解析
    • 数据
    • 需求及解析
  • 代码实现
    • 自定义数据类型
    • Map阶段
    • Reduce阶段
    • Driver阶段

数据及需求解析

数据


数据一示例

shouyue,430100,B190306162112070000,20190306162112,20190306162223,1,1,用户取消
companyid,address,orderid,ordertime,canceltime,operator,canceltypecode,cancelreason 
1200DDCX3307,430104,17625076885092,20190307173227,20190307173833,2,5,null
1200DDCX3307,430406,17624999825776,20190306164901,20190306164947,1,4,null
1100YDYC423D,430602,6665578474529331090,20190307172846,20190307172909,1,1,第三方接口取消
3301YXKJ223E,430700,17146323201919,20190307083000,20190307073855,3,3,
1100YDYC423D,430602,6665578474529331090,20190307172846,20190307172909,1,1,第三方接口取消
3301YXKJ223E,430700,,20190307083000,20190307073855,3,3,
shouyue,430100,P190307171256186000,20190307171255,20190307171348,1,1,点击下单120S内没有筛选到司机时, 乘客手动点击取消订单

字段解析

companyid,address,orderid,ordertime,canceltime,operator,canceltypecode,cancelreason         

数据二示例

340103,安徽省合肥市庐阳区
431027,湖南省郴州市桂东县
210302,辽宁省鞍山市铁东区
440605,广东省佛山市南海区

字段解析

行政区划代码,行政区

需求及解析


  1. 判断一行数据字段是否完整,本数据字段之间通过,分割,数据分割后长度为 8 ,如果分割后字符列表长度小于 8
    或有不完整字段(字段值为空),则清洗掉这一行数据。
  2. 去重清洗:若有相同订单 id(orderid)只保留第一行,其他的清洗掉;
  3. 撤销理由(cancelreason)有大量的字符串 null,请将这些字符串用”未知”替代;
  4. 将数据集中的订单时间(ordertime)、订单撤销时间(canceltime)转换为 “yyyy-MM-dd
    HH:mm:ss”格式,同时只保留订单时间(ordertime)和订单撤销时间(canceltime)在 2019 年 03 月 07
    日的数据,其他日期或者不匹配的数据清洗掉(字段名数据);
  5. 处理数据集中的行政区划代码(address),将其转换成对应的地区名称,在行政区划代码(address)字段后插入,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;

代码实现

自定义数据类型


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Data implements WritableComparable<Data> {
    private String companyid;
    private String address;
    private String orderid;
    private String ordertime;
    private String canceltime;
    private String operator;
    private String canceltypecode;
    private String cancelreason;

    @Override
    public int compareTo(Data o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(companyid);
        dataOutput.writeUTF(address);
        dataOutput.writeUTF(orderid);
        dataOutput.writeUTF(ordertime);
        dataOutput.writeUTF(canceltime);
        dataOutput.writeUTF(operator);
        dataOutput.writeUTF(canceltypecode);
        dataOutput.writeUTF(cancelreason);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        companyid = dataInput.readUTF();
        address = dataInput.readUTF();
        orderid = dataInput.readUTF();
        ordertime = dataInput.readUTF();
        canceltime = dataInput.readUTF();
        operator = dataInput.readUTF();
        canceltypecode = dataInput.readUTF();
        cancelreason = dataInput.readUTF();
    }

    public void set(String companyid, String address, String orderid, String ordertime, String canceltime, String operator, String canceltypecode, String cancelreason) {
        this.companyid = companyid;
        this.address = address;
        this.orderid = orderid;
        this.ordertime = ordertime;
        this.canceltime = canceltime;
        this.operator = operator;
        this.canceltypecode = canceltypecode;
        this.cancelreason = cancelreason;
    }

    @Override
    public String toString() {
        return companyid + "|" + address + "|" + orderid + "|" + ordertime + "|" + canceltime + "|" +
                operator + "|" + canceltypecode + "|" + cancelreason + "|";
    }

    public String getCompanyid() {
        return companyid;
    }

    public void setCompanyid(String companyid) {
        this.companyid = companyid;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getOrderid() {
        return orderid;
    }

    public void setOrderid(String orderid) {
        this.orderid = orderid;
    }

    public String getOrdertime() {
        return ordertime;
    }

    public void setOrdertime(String ordertime) {
        this.ordertime = ordertime;
    }

    public String getCanceltime() {
        return canceltime;
    }

    public void setCanceltime(String canceltime) {
        this.canceltime = canceltime;
    }

    public String getOperator() {
        return operator;
    }

    public void setOperator(String operator) {
        this.operator = operator;
    }

    public String getCanceltypecode() {
        return canceltypecode;
    }

    public void setCanceltypecode(String canceltypecode) {
        this.canceltypecode = canceltypecode;
    }

    public String getCancelreason() {
        return cancelreason;
    }

    public void setCancelreason(String cancelreason) {
        this.cancelreason = cancelreason;
    }
}


Map阶段


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;

public class MapTest extends Mapper<LongWritable, Text, Text, Data> {
    Text k = new Text();
    Data v = new Data();
    Map<String, String> address_name = new HashMap<String, String>();

    String ordertime;
    String canceltime;
    String address;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] uris = context.getCacheFiles();
        File file = new File(uris[0]);
        BufferedReader br = new BufferedReader(new FileReader(file));
        String line;
        while ((line = br.readLine()) != null) {
            address_name.put(line.split(",")[0], line.split(",")[1]);
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (value.toString().startsWith("companyid ")) {
            return;
        }
        String[] datas = value.toString().split(",", -1);
        //是否完整
        if (datas.length != 8) {
            return;
        }
        for (String s : datas) {
            if (s == null || s.equals("")) {
                return;
            }
        }
        //判断时间
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmss");
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            ordertime = sdf2.format(sdf1.parse(datas[3]));
            canceltime = sdf2.format(sdf1.parse(datas[4]));
        } catch (ParseException e) {
            e.printStackTrace();
        }
        if ((!ordertime.startsWith("2019-03-07")) && (!canceltime.startsWith("2019-03-07"))) {
            return;
        }
        //将null改成未知
        if ("null".equals(datas[7])) {
            datas[7] = "未知";
        }
        //地区
        if (address_name.containsKey(datas[1])) {
            address = datas[1]+"|"+address_name.get(datas[1]);
        }else {
            address = datas[1]+"|"+"未知";
        }
        k.set(datas[2]);
        v.set(datas[0], address, datas[2], ordertime, canceltime, datas[5], datas[6], datas[7]);
        context.write(k, v);
    }
}

Reduce阶段


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class RedTest extends Reducer<Text,Data,Data, NullWritable> {
    Data k = new Data();
    @Override
    protected void reduce(Text key, Iterable<Data> values, Context context) throws IOException, InterruptedException {
        for (Data data:values){
            k = data;
            context.getCounter("是否合格","总计").increment(1);
        }
        context.getCounter("是否合格","合格").increment(1);
        context.write(k,NullWritable.get());
    }
}

Driver阶段


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.net.URI;

public class DriTest {
    public static void main(String[] args) throws Exception {
        File file = new File("D:\MP\网约车\output");
        if (file.exists()) {
            delFile(file);
            driver();
        } else {
            driver();
        }
    }

    public static void delFile(File file) {
        File[] files = file.listFiles();
        if (files != null && files.length != 0) {
            for (int i = 0; i < files.length; i++) {
                delFile(files[i]);
            }
        }
        file.delete();
    }

    public static void driver() throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setMapperClass(MapTest.class);
        job.setJarByClass(DriTest.class);
        job.setReducerClass(RedTest.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Data.class);
        job.setOutputKeyClass(Data.class);
        job.setOutputValueClass(NullWritable.class);

        job.addCacheFile(new URI("file:///D:/MP/网约车/data.txt"));

        FileInputFormat.setInputPaths(job, "D:\MP\网约车\input");
        FileOutputFormat.setOutputPath(job, new Path("D:\MP\网约车\output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}


最后

以上就是落寞茉莉为你收集整理的MapReduce---网约车撤销订单数据清洗数据及需求解析代码实现的全部内容,希望文章能够帮你解决MapReduce---网约车撤销订单数据清洗数据及需求解析代码实现所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(34)

评论列表共有 0 条评论

立即
投稿
返回
顶部