目录
一、文件上传需求
二、具体代码实现
1.1 hdfsETL.json文件
1.2 定义UploadHDFSProject
1.3 定义UploadHdfsETLJson类
1.4 定义RegexLocalPathFilter类
1.5 定义DataCollect主类
一、文件上传需求
按照配置文件的内容解析要上传文件的路径,HDFS的目标路径,文件名格式匹配,按照不同的文件名上传到不同的HDFS目录,HDFS按照业务分目录存储文件
二、具体代码实现
1.1 hdfsETL.json文件
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16{ "hdfsProjectList":[ { "projectName":"data_collection", "localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/", "hdfsTargetPath":"hdfs:///hadoop/org/400backup/hour/%Y/%m/%d", "fileRegex":"^.*summary_sixty.*txt$" }, { "projectName":"ETL_collection", "localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/", "hdfsTargetPath":"hdfs:///hadoop/org/400backup/day/%Y/%m/%d", "fileRegex":"^.*summary_day.*txt$" } ] }
1.2 定义UploadHDFSProject
将JSON数组中的元数据信息定义为类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64package com.kangna.bean; /******************************************************************* *@auther kangll *@date 2021/6/26 14:50 *@desc 定义上传文件信息,因为会存在多个路不同项目本地文件的上传 ******************************************************************/ public class UploadHDFSProject { /* 上传文件的业务名 */ private String projectName; /* 本地文件路径 */ private String localSrcPath; /* HDFS路径 */ private String hdfsTargetPath; /* 文件名 */ private String fileRegex; public String getProjectName() { return projectName; } public void setProjectName(String projectName) { this.projectName = projectName; } public String getLocalSrcPath() { return localSrcPath; } public void setLocalSrcPath(String localSrcPath) { this.localSrcPath = localSrcPath; } public String getHdfsTargetPath() { return hdfsTargetPath; } public void setHdfsTargetPath(String hdfsTargetPath) { this.hdfsTargetPath = hdfsTargetPath; } public String getFileRegex() { return fileRegex; } public void setFileRegex(String fileRegex) { this.fileRegex = fileRegex; } @Override public String toString() { return "UploadHDFSProject{" + "projectName='" + projectName + ''' + ", localSrcPath='" + localSrcPath + ''' + ", hdfsTargetPath='" + hdfsTargetPath + ''' + ", fileRegex='" + fileRegex + ''' + '}'; } }
1.3 定义UploadHdfsETLJson类
将hdfsETL.json配置文件解析出来的JSON数组定义为List
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.kangna.bean; import java.util.List; /********************************************************************* *@auther kangll *@date 2021/6/26 17:06 *@desc 解析的 hdfsETL.json 封装到 List中 ********************************************************************/ public class UploadHdfsETLJson { /* 解析的 hdfsETL.json 封装到 List中 */ private List<UploadHDFSProject> hdfsProjectList; public List<UploadHDFSProject> getHdfsProjectList() { return hdfsProjectList; } public void setHdfsProjectList(List<UploadHDFSProject> hdfsProjectList) { this.hdfsProjectList = hdfsProjectList; } @Override public String toString() { return "UploadHdfsETLJson{" + "hdfsProjectList=" + hdfsProjectList + '}'; } }
1.4 定义RegexLocalPathFilter类
定义RegexLocalPathFilter类实现FileFilter接口实现文件过滤
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.kangna.hdfs; import java.io.File; import java.io.FileFilter; /********************************************************************* *@auther kangll *@date 2021/6/26 17:59 *@desc 本地文件过滤 ********************************************************************/ public class RegexLocalPathFilter implements FileFilter { private final String regex; public RegexLocalPathFilter(String regex) { this.regex = regex; } /** * 正则过滤要上传到HDFS的本地文件 * @param pathname * @return */ public boolean accept(File pathname) { String name = pathname.getName(); return !name.endsWith(".tmp") && name.toLowerCase().matches(regex); } }
1.5 定义DataCollect主类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 package com.kangna.hdfs; import com.alibaba.fastjson.JSON; import com.kangna.bean.UploadHDFSProject; import com.kangna.bean.UploadHdfsETLJson; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; /********************************************************************* *@auther kangll *@date 2021/6/26 17:15 *@desc 本地数据上传到 HDFS ********************************************************************/ public class DataCollectionFailoverUpload { // 配置信息 static String CONFIG_FILE = "hdfsETL.JSON"; // hdfsETL.JSON 中的HDFS日期格式化 static String TAR_REGEX = "%(\w)"; static final Pattern tagPattern = Pattern.compile(TAR_REGEX); /** * 解析JSON配置文件 * * @return */ public static UploadHdfsETLJson parseJSON() { UploadHdfsETLJson hdfsETLJson = null; try { InputStream fileInputStream = ClassLoader.getSystemResourceAsStream(CONFIG_FILE); String text = IOUtils.toString(fileInputStream, "utf8"); hdfsETLJson = JSON.parseObject(text, UploadHdfsETLJson.class); } catch (Exception e) { e.printStackTrace(); } return hdfsETLJson; } /** * 配置文件中的日期格式化 * * @param hdfsTragetPathStr HDFS路径 * @return */ public static String formatDate(String hdfsTragetPathStr) { Matcher matcher = tagPattern.matcher(hdfsTragetPathStr); Date date = new Date(); while (matcher.find()) { char c = matcher.group(1).charAt(0); String formatString = ""; switch (c) { case 'd': formatString = "dd"; break; case 'H': formatString = "HH"; break; case 'm': formatString = "MM"; break; case 'Y': formatString = "yyyy"; break; case 'M': formatString = "mm"; break; case 'S': formatString = "ss"; break; default: break; } SimpleDateFormat dateFormat = new SimpleDateFormat(formatString); String format = dateFormat.format(date); // DHFS 路径中 “%” 替换 hdfsTragetPathStr = hdfsTragetPathStr.replaceAll("%" + c, format); } return hdfsTragetPathStr; } /** * 本地文件过滤 * * @param localSrcPath * @param regex * @return */ public static List<Path> localListStatus(String localSrcPath, RegexLocalPathFilter regex) { List<Path> files = new ArrayList<>(); File file = new File(localSrcPath); if (!file.exists()) { System.out.println("源文件夹中没有新增文件!"); System.exit(0); } File[] listFiles = file.listFiles(regex); for (int i = 0; i < listFiles.length - 1; i++) { files.add(new Path(listFiles[i].getPath())); } return files; } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); // 获取 hdfsETL.json配置 UploadHdfsETLJson uploadHdfsETLJson = parseJSON(); List<UploadHDFSProject> hdfsProjectList = uploadHdfsETLJson.getHdfsProjectList(); for (UploadHDFSProject hp : hdfsProjectList) { hp.setHdfsTargetPath(formatDate(hp.getHdfsTargetPath())); // 获取指定目录下的指定文件 RegexLocalPathFilter regexLocalPathFilter = new RegexLocalPathFilter(hp.getFileRegex().toLowerCase()); // 获取文件 final List<Path> paths = localListStatus(hp.getLocalSrcPath(), regexLocalPathFilter); // 日期文件夹不存在则创建 if (!fs.exists(new Path(hp.getHdfsTargetPath()))) { fs.mkdirs(new Path(hp.getHdfsTargetPath())); } for (Path localFile : paths) { fs.copyFromLocalFile(localFile, new Path(hp.getHdfsTargetPath())); } } } }
测试结果
最后
以上就是动人蜻蜓最近收集整理的关于本地文件上传到HDFS的全部内容,更多相关本地文件上传到HDFS内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复