概述
直接上代码。涉及技术点:
- Json解析成关系型数据表结构
- MapReduce多文件输出
- 消除MapReduce默认文件输出格式
configuration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
控制_SUCCESS文件生成。
(1)FileOutputFormat
package com.leboop.www.json;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* Created by leboop on 2020/7/18.
*/
class FileOutputFormat extends TextOutputFormat<NullWritable, Text> {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
// TODO Auto-generated method stub
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getOutputName(context));
}
}
自定义FileOutputFormat取消生成的文件后缀r-0000等,比如需要生成file1,如果没有实现该类,会生成file1-r-0000。
(2)JsonParser
package com.leboop.www.json;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.util.List;
import java.util.Map;
/**
* Created by leboop on 2020/7/18.
*/
public class JsonParser {
public static void parse(Path path) throws Exception {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://192.168.128.11:9000");
System.setProperty("HADOOP_USER_NAME", "root");
Job job = Job.getInstance(configuration, "JsonParser");
job.setJarByClass(Main.class);
job.setMapperClass(JsonMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(FileOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, path);
Map<String, List<String>> saveFileMap = Utils.readSaveFile();
for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) {
System.out.println(entry.getKey());
MultipleOutputs.addNamedOutput(job, entry.getKey(), FileOutputFormat.class, NullWritable.class, Text.class);
}
FileOutputFormat.setOutputPath(job, new Path("/output/json"));
LazyOutputFormat.setOutputFormatClass(job, FileOutputFormat.class);
job.waitForCompletion(true);
}
}
LazyOutputFormat.setOutputFormatClass(job, FileOutputFormat.class);可以取消多余的文件part-r-0000.
这里的setOutputPath是设置所有文件相同的基目录,比如生成的文件分别在如下目录
/output/json/d1/file1
/output/json/d2/file2
/output/json/d3/file1
基目录设置为/output/json即可。
(3)JsonMapper
package com.leboop.www.json;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
/**
* Created by leboop on 2020/7/18.
*/
public class JsonMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs = null;
/**
* 保存字段名和字段值(字段值可能多个)
*/
private Map<String, List<String>> map = new HashMap<String, List<String>>();
/**
* 保存文件名,一个文件名相当于一个关系型数据库表
*/
private Map<String, List<String>> saveFileMap = new HashMap<String, List<String>>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<>(context);
saveFileMap = Utils.readSaveFile();
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String jsonStr = value.toString();
// 使用gson包,将json字符串转换成json对象
com.google.gson.JsonParser jsonParser = new com.google.gson.JsonParser();
JsonElement e = jsonParser.parse(jsonStr);
System.out.println("读取到的json字符串:");
System.out.println(e);
// 解析json
StringBuilder keySb = new StringBuilder();
jsonTree(e, keySb);
System.out.println("解析后的json数据:");
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
System.out.println(entry.getKey() + "
=
" + entry.getValue());
}
//
for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) { // 遍历文件
int maxSize = 0;
List<List<String>> tmp = new ArrayList<List<String>>();
for (String fieldName : entry.getValue()) {//遍历字段
List<String> fieldValueList = map.get(fieldName); //字段可能不存在
if (fieldValueList != null) {
if (fieldValueList.size() > maxSize) {
maxSize = fieldValueList.size();
}
tmp.add(fieldValueList);
} else {
tmp.add(Arrays.asList(""));
}
}
// 填充数组
String[][] arr = fill(tmp, maxSize);
//数组转换成字符串
String resultStr = arrToStr(arr);
//写入保存文件
System.out.println("保存文件" + entry.getKey() + ":");
System.out.println(resultStr);
System.out.println("++++++++++++++++++++++++++++++++++++++++++");
multipleOutputs.write(entry.getKey(), null, new Text(resultStr),String.valueOf(Time.now())+"/"+entry.getKey());
}
}
/**
* 数组转换成可写入文件的字符串
*
* @param arr
* @return
*/
private String arrToStr(String[][] arr) {
StringBuilder resultSb = new StringBuilder();
for (int r = 0; r < arr.length; r++) {
for (int c = 0; c < arr[r].length; c++) {
resultSb.append(arr[r][c]).append("|");
}
resultSb.append("n");
}
return resultSb.toString();
}
/**
* 数据填充
*
* @param tmp
* @param maxSize
* @return
*/
private String[][] fill(List<List<String>> tmp, int maxSize) {
String[][] arr = new String[maxSize][tmp.size()];
if (tmp.size() > 0) { //数据填充
for (int c = 0; c < tmp.size(); c++) {
List<String> curr = tmp.get(c);
if (maxSize % curr.size() != 0) { // 异常数据
break;
} else {
int rep = maxSize / curr.size(); //副本数
for (int k = 0; k < curr.size(); k++) { // 行数 =k*r
for (int r = 0; r < rep; r++) {
arr[k * rep + r][c] = curr.get(k);
}
}
}
}
}
return arr;
}
/**
* 递归解析json
*
* @param e
* @param key
*/
private void jsonTree(JsonElement e, StringBuilder key) {
// 原始数据类型
if (e.isJsonNull() || e.isJsonPrimitive()) {
String keyStr = key.toString();
if (map.containsKey(keyStr)) {
List<String> list = map.get(keyStr);
list.add(e.toString());
map.put(keyStr, list);
} else {
List<String> list = new ArrayList<String>();
list.add(e.toString());
map.put(keyStr, list);
}
key.setLength(0);
return;
}
// 对象数组,注:对象数组不一定是OA,反之OA一定是对象数组
if (e.isJsonArray()) {
JsonArray jsonArr = e.getAsJsonArray();
if (null != jsonArr) {
for (JsonElement je : jsonArr) {
if (je.isJsonNull() || je.isJsonPrimitive()) { // 数组中元素依然是原始数据类型
String keyStr = key.toString();
if (map.containsKey(keyStr)) {
List<String> list = map.get(keyStr);
list.set(0, map.get(keyStr).get(0) + "," + je.toString());
map.put(keyStr, list);
} else {
List<String> list = new ArrayList<String>();
list.add(je.toString());
map.put(keyStr, list);
}
} else { // 不是原始数据类型继续递归
jsonTree(je, key);
//递归
}
}
}
}
// json对象
if (e.isJsonObject()) {
// json的所有key=value对
Set<Map.Entry<String, JsonElement>> es = e.getAsJsonObject().entrySet();
String pKey = key.toString(); // 存储父节点
for (Map.Entry<String, JsonElement> en : es) {
if (pKey.length() > 0) { //连接符拼接
key = new StringBuilder(pKey + "-->" + en.getKey());
} else {
key = new StringBuilder(en.getKey());
}
JsonElement element = en.getValue();
jsonTree(element, key); //递归
}
}
}
}
multipleOutputs.write(entry.getKey(), null, new Text(resultStr),String.valueOf(Time.now())+"/"+entry.getKey());
设置多个目录多个文件输出,注意String.valueOf(Time.now())+"/"+entry.getKey()
"/"左边的是目录,右边必须与第一项相同entry.getKey(),表示最终保存的文件名。也可以如下写:
multipleOutputs.write(NullWritable.get(), new Text(resultStr), String.valueOf(Time.now()) + "/" + entry.getKey());
(4)Utils
package com.leboop.www.json;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by leboop on 2020/7/18.
*/
public class Utils {
/**
* 从文件中读取字段保存的文件(文件可以理解成一张数据库表)
*/
public static Map<String, List<String>> readSaveFile() {
Map<String, List<String>> saveFileMap = new HashMap<String, List<String>>();
try {
BufferedReader metaBR = new BufferedReader(new FileReader(
new File("G:\idea_workspace\MapReduce\data\meta")));
String line;
while ((line = metaBR.readLine()) != null) {
String k = line.split(":")[0];
String[] vs = line.split(":")[1].split("\|");
List<String> list = new ArrayList<String>();
for (String v : vs) {
list.add(v);
}
saveFileMap.put(k, list);
}
System.out.println("保存文件名:");
for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
System.out.println("+++++++++++++++++++++++++++++++");
} catch (Exception e1) {
}
return saveFileMap;
}
}
(5)Main
package com.leboop.www.json;
import org.apache.hadoop.fs.Path;
/**
* Created by leboop on 2020/7/18.
*/
public class Main {
public static void main(String[] args) throws Exception {
Path path = new Path("/json/data.json");
JsonParser.parse(path);
}
}
(6)data.json
{"OA":[{"rd":1,"rn":"s"},{"rd":2,"rn":"d"}],"OOA":[{"a":1,"b":[{"c":1,"d":[{"e":1},{"e":2}]},{"c":2,"d":[{"e":2},{"e":2}]}]},{"a":2,"b":[{"c":1,"d":[{"e":1},{"e":1}]},{"c":2,"d":[{"e":2},{"e":2}]}]}],"name":{"c":"RickandMorty","d":"dd"},"hobby":["t","m",{"s":true}],"id":"kladu8yak8asdha8","boolean":true,"number":3,"k":null,"ARRAY":[{"FIRST":"Rick","SEC":"tt"},{"LAST":"Morty"}]}
(7)meta
file1:name-->c|name-->d|hobby|hobby-->s|id|boolean|number|k|ARRAY-->FIRST|ARRAY-->SEC|ARRAY-->LAST
file2:id|OA-->rd|OA-->rn
file3:id|OOA-->a
file4:OOA-->a|OOA-->b-->c|OOA-->b-->d-->e
最后
以上就是美丽老鼠为你收集整理的gson+mapreduce解析Json数据保存到多个文件中(Json含多层嵌套数组)的全部内容,希望文章能够帮你解决gson+mapreduce解析Json数据保存到多个文件中(Json含多层嵌套数组)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复