概述
1. 根据配置文件得到三个待用目录
本地文件目录 (待上传目录)
hdfs远程文件备份目录(hdfs备用目录)
hdfs远程文件归档目录(hdfs正式路径)
hdfs的文件操作封装成工具类,随时供调用
public class HDFSUtil {
protected static Configuration conf;
protected FileSystem hdfs;
private Comparator<? super LocatedFileStatus> fileCompare;
public HDFSUtil() {
try {
if (conf == null) {
conf = new Configuration();
}
//获取配置的文件系统
hdfs = FileSystem.get(conf);
} catch (IOException e) {
throw new BaseException("HDFS读取配置文件获取连接操作异常!");
}
//文件比较器定义,即修改时间不同的文件,即为“不同的”文件
fileCompare = new Comparator<LocatedFileStatus>() {
@Override
public int compare(LocatedFileStatus o1, LocatedFileStatus o2) {
int rs = 0;
long l1 = o1.getModificationTime();
long l2 = o2.getModificationTime();
if (l1 > l2) {
rs = 1;
} else if (l1 < l2) {
rs = -1;
}
return rs;
}
};
}
/**
* 是否存在目录或文件 .
*
*/
public boolean existFile(String hdfsPath) throws IOException {
Path path = new Path(hdfsPath);
return hdfs.exists(path);
}
/**
* 删除目录或文件
*
* @param dir
*
目录名
*/
public boolean deleteFileOrDir(String dir) {
boolean flag = false;
try {
flag = hdfs.delete(new Path(dir), true);
if (!flag) {
throw new BaseException("删除目录或文件失败,请检查'" + dir + "'目录或文件是否存在!");
}
} catch (IOException e) {
throw new BaseException("HDFS删除目录或文件失败!");
}
return flag;
}
/**
* 将本地文件上传到HDFS服务器上
*
* @param source
*
源文件路径
* @param dest
*
目标文件路径
*/
public boolean uploadLocalFile2HDFS(String source, String dest) {
boolean flag = false;
Path src = new Path(source);
try {
Path dst = new Path(dest);
if (hdfs.exists(dst)) { // 如果目标文件已存在,则先删除再上传
deleteFileOrDir(dest);
}
hdfs.copyFromLocalFile(src, dst);
// 设置上传者本身有删除权限
hdfs.setPermission(dst, FsPermission.valueOf("-rwxrwxr-x"));
flag = true;
} catch (Exception e) {
throw new BaseException(dest, e.getMessage());
}
throw new BaseException("HDFS上传文件失败.");
}
return flag;
}
/**
* 新建目录
*
* @param dir
*
目录名
*/
public boolean makeDir(String dir) {
boolean flag = false;
try {
Path path = new Path(dir);
if (hdfs.exists(path)) {
flag = true; // 如果路径存在,直接返回true
} else {
flag = hdfs.mkdirs(new Path(dir));
}
} catch (IOException e) {
throw new BaseException("创建目录失败!");
}
return flag;
}
}
2.扫描本地文件目录中存在的文件名
//获取本地路径, pathName不可为null,否则会抛空指针异常
File file=new File(pathName);
/**
将此路径下的所有文件和目录名(当前名称,不包含此目录或文件的父目录名称)以数组形式列出,如果pathName放入的不是目录,而是文件名,则此函数返回null,如果该目录下无文件则返回空。
**/
String filename[]=file.list();
3. 本地文件目录+文件名 即为完整的需要上传的文件整体目录信息。
4. 新建对列以存储文件目录信息,将文件信息放入队列,等待上传
private static ConcurrentLinkedQueue<String> upLoadHdfsqueue = new ConcurrentLinkedQueue<>();
public static void add(String pathName) {
upLoadHdfsqueue.add(pathName);
logger.info(ILoggerBusiness.LOGTYPE_BUSINESS, ILoggerBusiness.ERRORCODE_NULL, "hdfs队列增加一个元素:" + str);
}
public static int GetQueueSize() {
return upLoadHdfsqueue.size();
}
最后
以上就是大力哑铃为你收集整理的上传本地文件到HDFS的全部内容,希望文章能够帮你解决上传本地文件到HDFS所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复