文章目录
- HDFS客户端操作
- 1. 配置HADOOP_HOME环境变量
- 2. 创建一个Maven工程HdfsClient
- 2.1 添加依赖
- 2.2 添加日志log4j
- 3. 创建HdfsClient类
- 3.1 文件的上传,以及测试优先级
- 3.2 文件的下载copyToLocalFile
- 3.3 删除文件夹delete
- 3.4 文件的改名rename
- 3.5 查看文件详情listFiles
- 3.6 判断是否是文件
- 3.7 HDFS的I/O流操作
- 把本地文件banhua.txt上传到HDFS根目录
- 从HDFS上下载banhua.txt文件到本地E盘上
- 分块读取HDFS上的大文件--下载第一部分
- 4. 问题总结
- 问题一
- 问题二
- ☆
HDFS客户端操作
- 下面是win10系统下编译后的hadoop-2.7.2的jar包。
- 链接:https://pan.baidu.com/s/1O2GVUevAGzUX1-IPQZwRqQ
- 提取码:9mx7
- 下载后将其拷贝到非中文路径中,比如说:
E:hadoophadoop-2.7.2
。
1. 配置HADOOP_HOME环境变量
2. 创建一个Maven工程HdfsClient
2.1 添加依赖
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aiguigu</groupId> <artifactId>HdfsClient</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}libtools.jar</systemPath> </dependency> </dependencies> </project>
2.2 添加日志log4j
需要在项目的
src/main/resources
目录下,新建一个文件,命名为log4j.properties
,在文件中填入
1
2
3
4
5
6
7
8
9log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3. 创建HdfsClient类
注意引包不要引错了!
- 获取文件系统
- 执行操作
- 关闭资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.atguigu.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.junit.Test; // ... /** * @Date 2020/6/27 14:26 * @Version 10.21 * @Author DuanChaojie */ public class HdfsClient { /** * 测试使用客户端创建目录 * @throws Exception */ @Test public void testMkdirs() throws Exception { // 1.获取文件系统(注意包不要引错) Configuration conf = new Configuration(); // 配置在集群上运行 // 第一种配置 //conf.set("fs.defaultFS","hdfs://hadoop:9000"); // FileSystem fs = FileSystem.get(conf); // 第二种配置 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); fs.mkdirs(new Path("/test7")); fs.close(); } }
3.1 文件的上传,以及测试优先级
copyFromLocalFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/** * 文件的上传,以及测试优先级 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testCopyFromLocalFile() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); // 这个优先于resources中的配置 conf.set("dfs.replication","1"); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.上传文件 fs.copyFromLocalFile(new Path("E:/file/ps.txt"),new Path("/banzhang3.txt")); fs.close(); System.out.println("执行结束..."); }
resources中的
hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- 指定HDFS副本的数量(优先于,linux中的配置) --> <property> <name>dfs.replication</name> <value>3</value> </property> </configuration>
参数优先级排序:
客户端代码中设置的值 >ClassPath(resources)下的用户自定义配置文件 >然后是服务器的默认配置
3.2 文件的下载copyToLocalFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/** * 下载文件 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testCopyToLocalFile() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.执行下载操作 fs.copyToLocalFile(true,new Path("/banzhang3.txt"),new Path("E:/file/banzhagn.txt"),true); // 3.关闭资源、 fs.close(); }
3.3 删除文件夹delete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/** * 删除文件夹 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testDelete() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.执行删除文件夹 fs.delete(new Path("/test"),true); // 3.关闭资源、 fs.close(); }
3.4 文件的改名rename
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/** * 文件名更改 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testRename() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.修改文件名 fs.rename(new Path("/banzhang2.txt"),new Path("/ps.txt")); // 3.关闭资源、 fs.close(); }
3.5 查看文件详情listFiles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45/** * 查看文件详情 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testListFiles() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()){ // 文件详情 LocatedFileStatus status = listFiles.next(); System.out.println("文件详情 = " + status); System.out.println("文件名称 = " + status.getPath().getName()); System.out.println("文件大小 = " + status.getLen()); System.out.println("文件权限 = " + status.getPermission()); System.out.println("文件分组 = " + status.getGroup()); // 获取储存的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块储存的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println("host = " + host); } } System.out.println("==================================="); } // 3.关闭资源、 fs.close(); }
3.6 判断是否是文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/** * 判断是否是文件 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testListStatus() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.判断是文件还是文件夹 FileStatus[] listStatus = fs.listStatus(new Path("/")); System.out.println("listStatus = " + listStatus); // 可以利用递归来遍历目录 for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()){ System.out.println("文件名 = " + fileStatus.getPath().getName()); } if (fileStatus.isDirectory()) { System.out.println("目录名 = " + fileStatus.getPath().getName()); } } // 3.关闭资源、 fs.close(); }
3.7 HDFS的I/O流操作
上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢?
我们可以采用IO流的方式实现数据的上传和下载。
把本地文件banhua.txt上传到HDFS根目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/** * 把本地文件banhua.txt上传到HDFS根目录 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void putFileToHDFS() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FileInputStream fis = new FileInputStream(new File("E:/file/banhua.txt")); // 3.创建输出流 FSDataOutputStream fos = fs.create(new Path("/banhua.txt")); // 4.流拷贝 IOUtils.copyBytes(fis,fos,conf); // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
从HDFS上下载banhua.txt文件到本地E盘上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30/** * 从HDFS上下载banhua.txt文件到本地E盘上 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void getFileFromHDFS() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/banhua.txt")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/banhua.txt"); // 4.流拷贝 IOUtils.copyBytes(fis,fos,conf); // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
分块读取HDFS上的大文件–下载第一部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/** * 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz * 下载第一部分 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void readFileSeek1() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part1"); // 4.流拷贝 byte[] bytes = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(bytes); fos.write(bytes); } // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
分块读取HDFS上的大文件–下载第二部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/** * 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz * 下载第二部分 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void readFileSeek2() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part2"); // 4.定位输入流数据位置 fis.seek(1024*1024*128); // 5.流的对拷 IOUtils.copyBytes(fis,fos,conf); // 6.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
在Window命令窗口中进入到目录E:/file/,然后执行如下命令,对数据进行合并
hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz
,解压发现该tar包非常完整。
4. 问题总结
问题一
idea启动出现:WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform 警告
解决方案:在log4j.propertis中添加下面的内容。
复制代码1
2log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
问题二
出现: java.io.IOException: Could not locate executable
nullbinwinutils.exe
in the Hadoop binaries.解决方案:配置一下CLASSPATH,然后重启。
☆
最后
以上就是感性玫瑰最近收集整理的关于3. HDFS客户端操作的全部内容,更多相关3.内容请搜索靠谱客的其他文章。
发表评论 取消回复