概述
目录
一、文件上传需求
二、具体代码实现
1.1 hdfsETL.json文件
1.2 定义UploadHDFSProject
1.3 定义UploadHdfsETLJson类
1.4 定义RegexLocalPathFilter类
1.5 定义DataCollect主类
一、文件上传需求
按照配置文件的内容解析要上传文件的路径,HDFS的目标路径,文件名格式匹配,按照不同的文件名上传到不同的HDFS目录,HDFS按照业务分目录存储文件
二、具体代码实现
1.1 hdfsETL.json文件
{
"hdfsProjectList":[
{
"projectName":"data_collection",
"localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/",
"hdfsTargetPath":"hdfs:///hadoop/org/400backup/hour/%Y/%m/%d",
"fileRegex":"^.*summary_sixty.*txt$"
},
{
"projectName":"ETL_collection",
"localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/",
"hdfsTargetPath":"hdfs:///hadoop/org/400backup/day/%Y/%m/%d",
"fileRegex":"^.*summary_day.*txt$"
}
]
}
1.2 定义UploadHDFSProject
将JSON数组中的元数据信息定义为类
package com.kangna.bean;
/*******************************************************************
*@auther kangll
*@date 2021/6/26 14:50
*@desc 定义上传文件信息,因为会存在多个路不同项目本地文件的上传
******************************************************************/
public class UploadHDFSProject {
/* 上传文件的业务名 */
private String projectName;
/* 本地文件路径 */
private String localSrcPath;
/* HDFS路径 */
private String hdfsTargetPath;
/* 文件名 */
private String fileRegex;
public String getProjectName() {
return projectName;
}
public void setProjectName(String projectName) {
this.projectName = projectName;
}
public String getLocalSrcPath() {
return localSrcPath;
}
public void setLocalSrcPath(String localSrcPath) {
this.localSrcPath = localSrcPath;
}
public String getHdfsTargetPath() {
return hdfsTargetPath;
}
public void setHdfsTargetPath(String hdfsTargetPath) {
this.hdfsTargetPath = hdfsTargetPath;
}
public String getFileRegex() {
return fileRegex;
}
public void setFileRegex(String fileRegex) {
this.fileRegex = fileRegex;
}
@Override
public String toString() {
return "UploadHDFSProject{" +
"projectName='" + projectName + ''' +
", localSrcPath='" + localSrcPath + ''' +
", hdfsTargetPath='" + hdfsTargetPath + ''' +
", fileRegex='" + fileRegex + ''' +
'}';
}
}
1.3 定义UploadHdfsETLJson类
将hdfsETL.json配置文件解析出来的JSON数组定义为List
package com.kangna.bean;
import java.util.List;
/*********************************************************************
*@auther kangll
*@date 2021/6/26 17:06
*@desc 解析的 hdfsETL.json 封装到 List中
********************************************************************/
public class UploadHdfsETLJson {
/* 解析的 hdfsETL.json 封装到 List中 */
private List<UploadHDFSProject> hdfsProjectList;
public List<UploadHDFSProject> getHdfsProjectList() {
return hdfsProjectList;
}
public void setHdfsProjectList(List<UploadHDFSProject> hdfsProjectList) {
this.hdfsProjectList = hdfsProjectList;
}
@Override
public String toString() {
return "UploadHdfsETLJson{" +
"hdfsProjectList=" + hdfsProjectList +
'}';
}
}
1.4 定义RegexLocalPathFilter类
定义RegexLocalPathFilter类实现FileFilter接口实现文件过滤
package com.kangna.hdfs;
import java.io.File;
import java.io.FileFilter;
/*********************************************************************
*@auther kangll
*@date 2021/6/26 17:59
*@desc 本地文件过滤
********************************************************************/
public class RegexLocalPathFilter implements FileFilter {
private final String regex;
public RegexLocalPathFilter(String regex) {
this.regex = regex;
}
/**
* 正则过滤要上传到HDFS的本地文件
* @param pathname
* @return
*/
public boolean accept(File pathname) {
String name = pathname.getName();
return !name.endsWith(".tmp") && name.toLowerCase().matches(regex);
}
}
1.5 定义DataCollect主类
package com.kangna.hdfs;
import com.alibaba.fastjson.JSON;
import com.kangna.bean.UploadHDFSProject;
import com.kangna.bean.UploadHdfsETLJson;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/*********************************************************************
*@auther kangll
*@date 2021/6/26 17:15
*@desc 本地数据上传到 HDFS
********************************************************************/
public class DataCollectionFailoverUpload {
// 配置信息
static String CONFIG_FILE = "hdfsETL.JSON";
// hdfsETL.JSON 中的HDFS日期格式化
static String TAR_REGEX = "%(\w)";
static final Pattern tagPattern = Pattern.compile(TAR_REGEX);
/**
* 解析JSON配置文件
*
* @return
*/
public static UploadHdfsETLJson parseJSON() {
UploadHdfsETLJson hdfsETLJson = null;
try {
InputStream fileInputStream = ClassLoader.getSystemResourceAsStream(CONFIG_FILE);
String text = IOUtils.toString(fileInputStream, "utf8");
hdfsETLJson = JSON.parseObject(text, UploadHdfsETLJson.class);
} catch (Exception e) {
e.printStackTrace();
}
return hdfsETLJson;
}
/**
* 配置文件中的日期格式化
*
* @param hdfsTragetPathStr HDFS路径
* @return
*/
public static String formatDate(String hdfsTragetPathStr) {
Matcher matcher = tagPattern.matcher(hdfsTragetPathStr);
Date date = new Date();
while (matcher.find()) {
char c = matcher.group(1).charAt(0);
String formatString = "";
switch (c) {
case 'd':
formatString = "dd";
break;
case 'H':
formatString = "HH";
break;
case 'm':
formatString = "MM";
break;
case 'Y':
formatString = "yyyy";
break;
case 'M':
formatString = "mm";
break;
case 'S':
formatString = "ss";
break;
default:
break;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(formatString);
String format = dateFormat.format(date);
// DHFS 路径中 “%” 替换
hdfsTragetPathStr = hdfsTragetPathStr.replaceAll("%" + c, format);
}
return hdfsTragetPathStr;
}
/**
* 本地文件过滤
*
* @param localSrcPath
* @param regex
* @return
*/
public static List<Path> localListStatus(String localSrcPath, RegexLocalPathFilter regex) {
List<Path> files = new ArrayList<>();
File file = new File(localSrcPath);
if (!file.exists()) {
System.out.println("源文件夹中没有新增文件!");
System.exit(0);
}
File[] listFiles = file.listFiles(regex);
for (int i = 0; i < listFiles.length - 1; i++) {
files.add(new Path(listFiles[i].getPath()));
}
return files;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// 获取 hdfsETL.json配置
UploadHdfsETLJson uploadHdfsETLJson = parseJSON();
List<UploadHDFSProject> hdfsProjectList = uploadHdfsETLJson.getHdfsProjectList();
for (UploadHDFSProject hp : hdfsProjectList) {
hp.setHdfsTargetPath(formatDate(hp.getHdfsTargetPath()));
// 获取指定目录下的指定文件
RegexLocalPathFilter regexLocalPathFilter = new RegexLocalPathFilter(hp.getFileRegex().toLowerCase());
// 获取文件
final List<Path> paths = localListStatus(hp.getLocalSrcPath(), regexLocalPathFilter);
// 日期文件夹不存在则创建
if (!fs.exists(new Path(hp.getHdfsTargetPath()))) {
fs.mkdirs(new Path(hp.getHdfsTargetPath()));
}
for (Path localFile : paths) {
fs.copyFromLocalFile(localFile, new Path(hp.getHdfsTargetPath()));
}
}
}
}
测试结果
最后
以上就是动人蜻蜓为你收集整理的本地文件上传到HDFS的全部内容,希望文章能够帮你解决本地文件上传到HDFS所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复