我是靠谱客的博主 感性玫瑰,这篇文章主要介绍3. HDFS客户端操作,现在分享给大家,希望可以做个参考。

文章目录

    • HDFS客户端操作
        • 1. 配置HADOOP_HOME环境变量
        • 2. 创建一个Maven工程HdfsClient
          • 2.1 添加依赖
          • 2.2 添加日志log4j
        • 3. 创建HdfsClient类
          • 3.1 文件的上传,以及测试优先级
          • 3.2 文件的下载copyToLocalFile
          • 3.3 删除文件夹delete
          • 3.4 文件的改名rename
          • 3.5 查看文件详情listFiles
          • 3.6 判断是否是文件
          • 3.7 HDFS的I/O流操作
            • 把本地文件banhua.txt上传到HDFS根目录
            • 从HDFS上下载banhua.txt文件到本地E盘上
            • 分块读取HDFS上的大文件--下载第一部分
        • 4. 问题总结
            • 问题一
            • 问题二

HDFS客户端操作

  1. 下面是win10系统下编译后的hadoop-2.7.2的jar包。
    1. 链接:https://pan.baidu.com/s/1O2GVUevAGzUX1-IPQZwRqQ
    2. 提取码:9mx7
  2. 下载后将其拷贝到非中文路径中,比如说:E:hadoophadoop-2.7.2

1. 配置HADOOP_HOME环境变量

在这里插入图片描述

2. 创建一个Maven工程HdfsClient

2.1 添加依赖

pom.xml

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aiguigu</groupId> <artifactId>HdfsClient</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}libtools.jar</systemPath> </dependency> </dependencies> </project>
2.2 添加日志log4j

需要在项目的src/main/resources目录下,新建一个文件,命名为log4j.properties,在文件中填入

复制代码
1
2
3
4
5
6
7
8
9
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3. 创建HdfsClient类

注意引包不要引错了!

  1. 获取文件系统
  2. 执行操作
  3. 关闭资源
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.junit.Test; // ... /** * @Date 2020/6/27 14:26 * @Version 10.21 * @Author DuanChaojie */ public class HdfsClient { /** * 测试使用客户端创建目录 * @throws Exception */ @Test public void testMkdirs() throws Exception { // 1.获取文件系统(注意包不要引错) Configuration conf = new Configuration(); // 配置在集群上运行 // 第一种配置 //conf.set("fs.defaultFS","hdfs://hadoop:9000"); // FileSystem fs = FileSystem.get(conf); // 第二种配置 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); fs.mkdirs(new Path("/test7")); fs.close(); } }
3.1 文件的上传,以及测试优先级

copyFromLocalFile

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** * 文件的上传,以及测试优先级 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testCopyFromLocalFile() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); // 这个优先于resources中的配置 conf.set("dfs.replication","1"); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.上传文件 fs.copyFromLocalFile(new Path("E:/file/ps.txt"),new Path("/banzhang3.txt")); fs.close(); System.out.println("执行结束..."); }

resources中的hdfs-site.xml

复制代码
1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- 指定HDFS副本的数量(优先于,linux中的配置) --> <property> <name>dfs.replication</name> <value>3</value> </property> </configuration>

参数优先级排序:客户端代码中设置的值 >ClassPath(resources)下的用户自定义配置文件 >然后是服务器的默认配置

3.2 文件的下载copyToLocalFile
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** * 下载文件 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testCopyToLocalFile() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.执行下载操作 fs.copyToLocalFile(true,new Path("/banzhang3.txt"),new Path("E:/file/banzhagn.txt"),true); // 3.关闭资源、 fs.close(); }
3.3 删除文件夹delete
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** * 删除文件夹 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testDelete() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.执行删除文件夹 fs.delete(new Path("/test"),true); // 3.关闭资源、 fs.close(); }
3.4 文件的改名rename
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** * 文件名更改 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testRename() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.修改文件名 fs.rename(new Path("/banzhang2.txt"),new Path("/ps.txt")); // 3.关闭资源、 fs.close(); }
3.5 查看文件详情listFiles
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/** * 查看文件详情 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testListFiles() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()){ // 文件详情 LocatedFileStatus status = listFiles.next(); System.out.println("文件详情 = " + status); System.out.println("文件名称 = " + status.getPath().getName()); System.out.println("文件大小 = " + status.getLen()); System.out.println("文件权限 = " + status.getPermission()); System.out.println("文件分组 = " + status.getGroup()); // 获取储存的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块储存的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println("host = " + host); } } System.out.println("==================================="); } // 3.关闭资源、 fs.close(); }
3.6 判断是否是文件
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** * 判断是否是文件 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void testListStatus() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2.判断是文件还是文件夹 FileStatus[] listStatus = fs.listStatus(new Path("/")); System.out.println("listStatus = " + listStatus); // 可以利用递归来遍历目录 for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()){ System.out.println("文件名 = " + fileStatus.getPath().getName()); } if (fileStatus.isDirectory()) { System.out.println("目录名 = " + fileStatus.getPath().getName()); } } // 3.关闭资源、 fs.close(); }
3.7 HDFS的I/O流操作

上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢?

我们可以采用IO流的方式实现数据的上传和下载。

把本地文件banhua.txt上传到HDFS根目录
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** * 把本地文件banhua.txt上传到HDFS根目录 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void putFileToHDFS() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FileInputStream fis = new FileInputStream(new File("E:/file/banhua.txt")); // 3.创建输出流 FSDataOutputStream fos = fs.create(new Path("/banhua.txt")); // 4.流拷贝 IOUtils.copyBytes(fis,fos,conf); // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
从HDFS上下载banhua.txt文件到本地E盘上
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** * 从HDFS上下载banhua.txt文件到本地E盘上 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void getFileFromHDFS() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/banhua.txt")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/banhua.txt"); // 4.流拷贝 IOUtils.copyBytes(fis,fos,conf); // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
分块读取HDFS上的大文件–下载第一部分
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** * 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz * 下载第一部分 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void readFileSeek1() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part1"); // 4.流拷贝 byte[] bytes = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(bytes); fos.write(bytes); } // 3.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }

分块读取HDFS上的大文件–下载第二部分

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** * 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz * 下载第二部分 * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ @Test public void readFileSeek2() throws IOException, URISyntaxException, InterruptedException { // 1.获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu"); // 2 创建输入流 FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz")); // 3.创建输出流 FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part2"); // 4.定位输入流数据位置 fis.seek(1024*1024*128); // 5.流的对拷 IOUtils.copyBytes(fis,fos,conf); // 6.关闭资源、 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }

在Window命令窗口中进入到目录E:/file/,然后执行如下命令,对数据进行合并 hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz,解压发现该tar包非常完整。

4. 问题总结

问题一

idea启动出现:WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform 警告

解决方案:在log4j.propertis中添加下面的内容。

复制代码
1
2
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
问题二

出现: java.io.IOException: Could not locate executable nullbinwinutils.exe in the Hadoop binaries.

解决方案:配置一下CLASSPATH,然后重启。

在这里插入图片描述

最后

以上就是感性玫瑰最近收集整理的关于3. HDFS客户端操作的全部内容,更多相关3.内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部