1. 根据配置文件得到三个待用目录
本地文件目录 (待上传目录)
hdfs远程文件备份目录(hdfs备用目录)
hdfs远程文件归档目录(hdfs正式路径)
hdfs的文件操作封装成工具类,随时供调用
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108public class HDFSUtil { protected static Configuration conf; protected FileSystem hdfs; private Comparator<? super LocatedFileStatus> fileCompare; public HDFSUtil() { try { if (conf == null) { conf = new Configuration(); } //获取配置的文件系统 hdfs = FileSystem.get(conf); } catch (IOException e) { throw new BaseException("HDFS读取配置文件获取连接操作异常!"); } //文件比较器定义,即修改时间不同的文件,即为“不同的”文件 fileCompare = new Comparator<LocatedFileStatus>() { @Override public int compare(LocatedFileStatus o1, LocatedFileStatus o2) { int rs = 0; long l1 = o1.getModificationTime(); long l2 = o2.getModificationTime(); if (l1 > l2) { rs = 1; } else if (l1 < l2) { rs = -1; } return rs; } }; } /** * 是否存在目录或文件 . * */ public boolean existFile(String hdfsPath) throws IOException { Path path = new Path(hdfsPath); return hdfs.exists(path); } /** * 删除目录或文件 * * @param dir * 目录名 */ public boolean deleteFileOrDir(String dir) { boolean flag = false; try { flag = hdfs.delete(new Path(dir), true); if (!flag) { throw new BaseException("删除目录或文件失败,请检查'" + dir + "'目录或文件是否存在!"); } } catch (IOException e) { throw new BaseException("HDFS删除目录或文件失败!"); } return flag; } /** * 将本地文件上传到HDFS服务器上 * * @param source * 源文件路径 * @param dest * 目标文件路径 */ public boolean uploadLocalFile2HDFS(String source, String dest) { boolean flag = false; Path src = new Path(source); try { Path dst = new Path(dest); if (hdfs.exists(dst)) { // 如果目标文件已存在,则先删除再上传 deleteFileOrDir(dest); } hdfs.copyFromLocalFile(src, dst); // 设置上传者本身有删除权限 hdfs.setPermission(dst, FsPermission.valueOf("-rwxrwxr-x")); flag = true; } catch (Exception e) { throw new BaseException(dest, e.getMessage()); } throw new BaseException("HDFS上传文件失败."); } return flag; } /** * 新建目录 * * @param dir * 目录名 */ public boolean makeDir(String dir) { boolean flag = false; try { Path path = new Path(dir); if (hdfs.exists(path)) { flag = true; // 如果路径存在,直接返回true } else { flag = hdfs.mkdirs(new Path(dir)); } } catch (IOException e) { throw new BaseException("创建目录失败!"); } return flag; } }
2.扫描本地文件目录中存在的文件名
复制代码
1
2
3
4
5
6//获取本地路径, pathName不可为null,否则会抛空指针异常 File file=new File(pathName); /** 将此路径下的所有文件和目录名(当前名称,不包含此目录或文件的父目录名称)以数组形式列出,如果pathName放入的不是目录,而是文件名,则此函数返回null,如果该目录下无文件则返回空。 **/ String filename[]=file.list();
3. 本地文件目录+文件名 即为完整的需要上传的文件整体目录信息。
4. 新建对列以存储文件目录信息,将文件信息放入队列,等待上传
复制代码
1
2
3
4
5
6
7
8
9
10private static ConcurrentLinkedQueue<String> upLoadHdfsqueue = new ConcurrentLinkedQueue<>(); public static void add(String pathName) { upLoadHdfsqueue.add(pathName); logger.info(ILoggerBusiness.LOGTYPE_BUSINESS, ILoggerBusiness.ERRORCODE_NULL, "hdfs队列增加一个元素:" + str); } public static int GetQueueSize() { return upLoadHdfsqueue.size(); }
最后
以上就是大力哑铃最近收集整理的关于上传本地文件到HDFS的全部内容,更多相关上传本地文件到HDFS内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复