我是靠谱客的博主 坚强大山,最近开发中收集的这篇文章主要介绍【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
文章目录
- 【MapReduce】招聘数据清洗_JsonLine类型文件
- 一、需求及分析
- 二、代码实现
- ♦ 自定义的Bean对象
- ♦ Map阶段
- ♦ Reduce阶段
- ♦ Driver阶段
【MapReduce】招聘数据清洗_JsonLine类型文件
数据样式:json类型数据
字段分析:
从左到右分别是 :id编号 公司名称 学历要求 工作类型 工作名称 薪资 发布时间 截止时间 城市编码 公司规模 福利 岗位职责 地区 工作经验
城市数据
字段:城市id和城市名
返回顶部
一、需求及分析
- 每一个值都不能为空,只要有一个为空就删除整条数据 ----
数据转为json对象,遍历每个json对象属性值
- 处理工资,让其变成最大值与最小值差值的一半 ----
字符串提取,转型
- 使用城市名替换城市id ----
mapJoin操作,缓存城市信息表
返回顶部
二、代码实现
♦ 自定义的Bean对象
为要实现json格式的转换,属性名要和json文件中的key保持一致,并且实现WritableComparable的序列化、反序列化。
package 招聘数据处理_Json;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class BeanTest implements WritableComparable<BeanTest> {
// 定义封装类型的属性
private String id;
private String company_name;
private String eduLevel_name;
private String emplType;
private String jobName;
private String salary;
private String createDate;
private String endDate;
private String city_code;
private String companySize;
private String welfare;
private String responsibility;
private String place;
private String workingExp;
private String city_name;
@Override
public int compareTo(BeanTest o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(company_name);
dataOutput.writeUTF(eduLevel_name);
dataOutput.writeUTF(emplType);
dataOutput.writeUTF(jobName);
dataOutput.writeUTF(salary);
dataOutput.writeUTF(createDate);
dataOutput.writeUTF(endDate);
dataOutput.writeUTF(city_code);
dataOutput.writeUTF(companySize);
dataOutput.writeUTF(welfare);
dataOutput.writeUTF(responsibility);
dataOutput.writeUTF(place);
dataOutput.writeUTF(workingExp);
dataOutput.writeUTF(city_name);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readUTF();
company_name = dataInput.readUTF();
eduLevel_name = dataInput.readUTF();
emplType = dataInput.readUTF();
jobName = dataInput.readUTF();
salary = dataInput.readUTF();
createDate = dataInput.readUTF();
endDate = dataInput.readUTF();
city_code = dataInput.readUTF();
companySize = dataInput.readUTF();
welfare = dataInput.readUTF();
responsibility = dataInput.readUTF();
place = dataInput.readUTF();
workingExp = dataInput.readUTF();
city_name = dataInput.readUTF();
}
// toString
@Override
public String toString() {
return id + 't' +
company_name + 't' +
eduLevel_name + 't' +
emplType + 't' +
jobName + 't' +
salary + 't' +
createDate + 't' +
endDate + 't' +
city_code + 't' +
companySize + 't' +
welfare + 't' +
responsibility + 't' +
place + 't' +
workingExp + 't' +
city_name;
}
// settergetter
省略setter、getter方法
}
返回顶部
♦ Map阶段
Map阶段中的setup方法将城市表缓存到内存里,然后实现替换的操作,先使用JSONObject来存储,判断是否全部为空,如果是的话,在进行下一步的处理。将salary分割,然后按要求进行处理
package 招聘数据处理_Json;
import CSDN综合练习.Bean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
public class MapperTest extends Mapper<LongWritable, Text,BeanTest, NullWritable> {
private BeanTest k = new BeanTest();
private HashMap<String,String> hashMap = new HashMap();
private String city_code;
private String salary;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try{
// 获取缓存文件
URI[] uris = context.getCacheFiles();
File file = new File(uris[0]);
// 读取缓存文件
BufferedReader reader = new BufferedReader(new FileReader(file));
String lines;
while ((lines=reader.readLine())!=null){
String[] fields = lines.split(",");
hashMap.put(fields[0],fields[1]); // 将内容存入集合
}
// 关闭资源
IOUtils.closeStream(reader);
}catch (Exception e){
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.去空
// 1.1 每获取一行数据,就将其换为JsonObject对象
JSONObject jsonObject = JSON.parseObject(value.toString());
// 1.2 创建数组,存储每行的数据
String[] str = new String[14];
str[0] = jsonObject.getString("id");
str[1] = jsonObject.getString("company_name");
str[2] = jsonObject.getString("eduLevel_name");
str[3] = jsonObject.getString("emplType");
str[4] = jsonObject.getString("jobName");
str[5] = jsonObject.getString("salary");
str[6] = jsonObject.getString("createDate");
str[7] = jsonObject.getString("endDate");
str[8] = jsonObject.getString("city_code");
str[9] = jsonObject.getString("companySize");
str[10] = jsonObject.getString("welfare");
str[11] = jsonObject.getString("responsibility");
str[12] = jsonObject.getString("place");
str[13] = jsonObject.getString("workingExp");
// 1.3 遍历每个属性值去空
for (String s:str){
if (s.equals("") || s==null){
return;
}
}
// 2.指定Json对象为BeanTest类型
k = JSON.parseObject(value.toString(),BeanTest.class);
// 3.使用城市名替换城市id
city_code = k.getCity_code();
k.setCity_name(hashMap.get(city_code));
// 4.处理工资,让其变成(最大-最小)/2 8K-10K
String[] salarys = k.getSalary().split("-");
salary = String.valueOf((Integer.parseInt(salarys[1].substring(0,salarys[1].length()-1)) - Integer.parseInt(salarys[0].substring(0,salarys[0].length()-1))));
k.setSalary(salary);
// 5.写出
context.write(k,NullWritable.get());
}
}
返回顶部
♦ Reduce阶段
只要循环输出所有的结果即可
package 招聘数据处理_Json;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceTest extends Reducer<BeanTest, NullWritable, BeanTest, NullWritable> {
@Override
protected void reduce(BeanTest key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 直接写出
for (NullWritable v:values){
context.write(key,NullWritable.get());
}
}
}
返回顶部
♦ Driver阶段
package 招聘数据处理_Json;
import com.alibaba.fastjson.parser.ParserConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class DriverTest {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 1.获取job
job = Job.getInstance(conf);
// 2.关联配置
job.setMapperClass(MapperTest.class);
job.setReducerClass(ReduceTest.class);
job.setJarByClass(DriverTest.class);
job.setMapOutputKeyClass(BeanTest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(BeanTest.class);
job.setOutputValueClass(NullWritable.class);
// 3.配置缓存文件路径
job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/招聘数据处理_Json/data/com.txt"));
// 4.配置输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\招聘数据处理_Json\data\exploy.json"));
FileOutputFormat.setOutputPath(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\招聘数据处理_Json\output\"));
// 5.配置转换asm
//ParserConfig.getGlobalInstance().setAsmEnable(false);
// 6.提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}
注意:可能会报错,参见本人博客:FastJson 解析报错serializer error/deserializer error~
返回顶部
最后
以上就是坚强大山为你收集整理的【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件的全部内容,希望文章能够帮你解决【MapReduce&FastJson】招聘数据清洗(JsonLine类型数据)【MapReduce】招聘数据清洗_JsonLine类型文件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复