我是靠谱客的博主 坚强大山,最近开发中收集的这篇文章主要介绍【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


文章目录

  • 【MapReduce】招聘数据清洗_JsonLine类型文件
    • 一、需求及分析
    • 二、代码实现
      • ♦ 自定义的Bean对象
      • ♦ Map阶段
      • ♦ Reduce阶段
      • ♦ Driver阶段


【MapReduce】招聘数据清洗_JsonLine类型文件

数据样式:json类型数据

字段分析:
从左到右分别是 :id编号 公司名称 学历要求 工作类型 工作名称 薪资 发布时间 截止时间 城市编码 公司规模 福利 岗位职责 地区 工作经验
在这里插入图片描述

城市数据
字段:城市id和城市名 在这里插入图片描述

返回顶部


一、需求及分析

  • 每一个值都不能为空,只要有一个为空就删除整条数据 ---- 数据转为json对象,遍历每个json对象属性值
  • 处理工资,让其变成最大值与最小值差值的一半 ---- 字符串提取,转型
  • 使用城市名替换城市id ---- mapJoin操作,缓存城市信息表

返回顶部


二、代码实现

♦ 自定义的Bean对象

为要实现json格式的转换,属性名要和json文件中的key保持一致,并且实现WritableComparable的序列化、反序列化。

package 招聘数据处理_Json;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class BeanTest implements WritableComparable<BeanTest> {

    // 定义封装类型的属性
    private String id;
    private String company_name;
    private String eduLevel_name;
    private String emplType;
    private String jobName;
    private String salary;
    private String createDate;
    private String endDate;
    private String city_code;
    private String companySize;
    private String welfare;
    private String responsibility;
    private String place;
    private String workingExp;
    private String city_name;


    @Override
    public int compareTo(BeanTest o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(company_name);
        dataOutput.writeUTF(eduLevel_name);
        dataOutput.writeUTF(emplType);
        dataOutput.writeUTF(jobName);
        dataOutput.writeUTF(salary);
        dataOutput.writeUTF(createDate);
        dataOutput.writeUTF(endDate);
        dataOutput.writeUTF(city_code);
        dataOutput.writeUTF(companySize);
        dataOutput.writeUTF(welfare);
        dataOutput.writeUTF(responsibility);
        dataOutput.writeUTF(place);
        dataOutput.writeUTF(workingExp);
        dataOutput.writeUTF(city_name);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        id = dataInput.readUTF();
        company_name = dataInput.readUTF();
        eduLevel_name = dataInput.readUTF();
        emplType = dataInput.readUTF();
        jobName = dataInput.readUTF();
        salary = dataInput.readUTF();
        createDate = dataInput.readUTF();
        endDate = dataInput.readUTF();
        city_code = dataInput.readUTF();
        companySize = dataInput.readUTF();
        welfare = dataInput.readUTF();
        responsibility = dataInput.readUTF();
        place = dataInput.readUTF();
        workingExp = dataInput.readUTF();
        city_name = dataInput.readUTF();
    }

     // toString
    @Override
    public String toString() {
        return  id + 't' +
                company_name + 't' +
                eduLevel_name + 't' +
                emplType + 't' +
                jobName + 't' +
                salary + 't' +
                createDate + 't' +
                endDate + 't' +
                city_code + 't' +
                companySize + 't' +
                welfare + 't' +
                responsibility + 't' +
                place + 't' +
                workingExp + 't' +
                city_name;
    }
    // settergetter
    省略setter、getter方法
}

返回顶部


♦ Map阶段

Map阶段中的setup方法将城市表缓存到内存里,然后实现替换的操作,先使用JSONObject来存储,判断是否全部为空,如果是的话,在进行下一步的处理。将salary分割,然后按要求进行处理

package 招聘数据处理_Json;

import CSDN综合练习.Bean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;

public class MapperTest extends Mapper<LongWritable, Text,BeanTest, NullWritable> {
    private BeanTest k = new BeanTest();
    private HashMap<String,String> hashMap = new HashMap();
    private String city_code;
    private String salary;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        try{
            // 获取缓存文件
            URI[] uris = context.getCacheFiles();
            File file = new File(uris[0]);
            // 读取缓存文件
            BufferedReader reader = new BufferedReader(new FileReader(file));
            String lines;
            while ((lines=reader.readLine())!=null){
                String[] fields = lines.split(",");
                hashMap.put(fields[0],fields[1]); // 将内容存入集合
            }
            // 关闭资源
            IOUtils.closeStream(reader);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.去空
        //   1.1 每获取一行数据,就将其换为JsonObject对象
        JSONObject jsonObject = JSON.parseObject(value.toString());
        //   1.2 创建数组,存储每行的数据
        String[] str = new String[14];
        str[0] = jsonObject.getString("id");
        str[1] = jsonObject.getString("company_name");
        str[2] = jsonObject.getString("eduLevel_name");
        str[3] = jsonObject.getString("emplType");
        str[4] = jsonObject.getString("jobName");
        str[5] = jsonObject.getString("salary");
        str[6] = jsonObject.getString("createDate");
        str[7] = jsonObject.getString("endDate");
        str[8] = jsonObject.getString("city_code");
        str[9] = jsonObject.getString("companySize");
        str[10] = jsonObject.getString("welfare");
        str[11] = jsonObject.getString("responsibility");
        str[12] = jsonObject.getString("place");
        str[13] = jsonObject.getString("workingExp");
        //   1.3 遍历每个属性值去空
        for (String s:str){
            if (s.equals("") || s==null){
                return;
            }
        }

        // 2.指定Json对象为BeanTest类型
        k = JSON.parseObject(value.toString(),BeanTest.class);

        // 3.使用城市名替换城市id
        city_code = k.getCity_code();
        k.setCity_name(hashMap.get(city_code));

        // 4.处理工资,让其变成(最大-最小)/2     8K-10K
        String[] salarys = k.getSalary().split("-");
        salary = String.valueOf((Integer.parseInt(salarys[1].substring(0,salarys[1].length()-1)) - Integer.parseInt(salarys[0].substring(0,salarys[0].length()-1))));
        k.setSalary(salary);

        // 5.写出
        context.write(k,NullWritable.get());
    }
}

返回顶部


♦ Reduce阶段

只要循环输出所有的结果即可

package 招聘数据处理_Json;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class ReduceTest extends Reducer<BeanTest, NullWritable, BeanTest, NullWritable> {
    @Override
    protected void reduce(BeanTest key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 直接写出
        for (NullWritable v:values){
            context.write(key,NullWritable.get());
        }
    }
}

返回顶部


♦ Driver阶段

package 招聘数据处理_Json;

import com.alibaba.fastjson.parser.ParserConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;

public class DriverTest {
    public static void main(String[] args) {
        Job job = null;
        Configuration conf = new Configuration();
        try {
            // 1.获取job
            job = Job.getInstance(conf);
            // 2.关联配置
            job.setMapperClass(MapperTest.class);
            job.setReducerClass(ReduceTest.class);
            job.setJarByClass(DriverTest.class);

            job.setMapOutputKeyClass(BeanTest.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(BeanTest.class);
            job.setOutputValueClass(NullWritable.class);

            // 3.配置缓存文件路径
            job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/招聘数据处理_Json/data/com.txt"));
            // 4.配置输入输出路径
            FileInputFormat.setInputPaths(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\招聘数据处理_Json\data\exploy.json"));
            FileOutputFormat.setOutputPath(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\招聘数据处理_Json\output\"));
            // 5.配置转换asm
            //ParserConfig.getGlobalInstance().setAsmEnable(false);
            // 6.提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result?0:1);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

注意:可能会报错,参见本人博客:FastJson 解析报错serializer error/deserializer error~

返回顶部


最后

以上就是坚强大山为你收集整理的【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件的全部内容,希望文章能够帮你解决【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部