概述
文章目录
- HDFS客户端操作
- 1. 配置HADOOP_HOME环境变量
- 2. 创建一个Maven工程HdfsClient
- 2.1 添加依赖
- 2.2 添加日志log4j
- 3. 创建HdfsClient类
- 3.1 文件的上传,以及测试优先级
- 3.2 文件的下载copyToLocalFile
- 3.3 删除文件夹delete
- 3.4 文件的改名rename
- 3.5 查看文件详情listFiles
- 3.6 判断是否是文件
- 3.7 HDFS的I/O流操作
- 把本地文件banhua.txt上传到HDFS根目录
- 从HDFS上下载banhua.txt文件到本地E盘上
- 分块读取HDFS上的大文件--下载第一部分
- 4. 问题总结
- 问题一
- 问题二
- ☆
HDFS客户端操作
- 下面是win10系统下编译后的hadoop-2.7.2的jar包。
- 链接:https://pan.baidu.com/s/1O2GVUevAGzUX1-IPQZwRqQ
- 提取码:9mx7
- 下载后将其拷贝到非中文路径中,比如说:
E:hadoophadoop-2.7.2
。
1. 配置HADOOP_HOME环境变量
2. 创建一个Maven工程HdfsClient
2.1 添加依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aiguigu</groupId>
<artifactId>HdfsClient</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}libtools.jar</systemPath>
</dependency>
</dependencies>
</project>
2.2 添加日志log4j
需要在项目的
src/main/resources
目录下,新建一个文件,命名为log4j.properties
,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3. 创建HdfsClient类
注意引包不要引错了!
- 获取文件系统
- 执行操作
- 关闭资源
package com.atguigu.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
// ...
/**
* @Date 2020/6/27 14:26
* @Version 10.21
* @Author DuanChaojie
*/
public class HdfsClient {
/**
* 测试使用客户端创建目录
* @throws Exception
*/
@Test
public void testMkdirs() throws Exception {
// 1.获取文件系统(注意包不要引错)
Configuration conf = new Configuration();
// 配置在集群上运行
// 第一种配置
//conf.set("fs.defaultFS","hdfs://hadoop:9000");
// FileSystem fs = FileSystem.get(conf);
// 第二种配置
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
fs.mkdirs(new Path("/test7"));
fs.close();
}
}
3.1 文件的上传,以及测试优先级
copyFromLocalFile
/**
* 文件的上传,以及测试优先级
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testCopyFromLocalFile() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
// 这个优先于resources中的配置
conf.set("dfs.replication","1");
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2.上传文件
fs.copyFromLocalFile(new Path("E:/file/ps.txt"),new Path("/banzhang3.txt"));
fs.close();
System.out.println("执行结束...");
}
resources中的
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定HDFS副本的数量(优先于,linux中的配置) -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
参数优先级排序:
客户端代码中设置的值 >ClassPath(resources)下的用户自定义配置文件 >然后是服务器的默认配置
3.2 文件的下载copyToLocalFile
/**
* 下载文件
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testCopyToLocalFile() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2.执行下载操作
fs.copyToLocalFile(true,new Path("/banzhang3.txt"),new Path("E:/file/banzhagn.txt"),true);
// 3.关闭资源、
fs.close();
}
3.3 删除文件夹delete
/**
* 删除文件夹
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testDelete() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2.执行删除文件夹
fs.delete(new Path("/test"),true);
// 3.关闭资源、
fs.close();
}
3.4 文件的改名rename
/**
* 文件名更改
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testRename() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2.修改文件名
fs.rename(new Path("/banzhang2.txt"),new Path("/ps.txt"));
// 3.关闭资源、
fs.close();
}
3.5 查看文件详情listFiles
/**
* 查看文件详情
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testListFiles() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 获取文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()){
// 文件详情
LocatedFileStatus status = listFiles.next();
System.out.println("文件详情 = " + status);
System.out.println("文件名称 = " + status.getPath().getName());
System.out.println("文件大小 = " + status.getLen());
System.out.println("文件权限 = " + status.getPermission());
System.out.println("文件分组 = " + status.getGroup());
// 获取储存的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块储存的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println("host = " + host);
}
}
System.out.println("===================================");
}
// 3.关闭资源、
fs.close();
}
3.6 判断是否是文件
/**
* 判断是否是文件
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testListStatus() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2.判断是文件还是文件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
System.out.println("listStatus = " + listStatus);
// 可以利用递归来遍历目录
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()){
System.out.println("文件名 = " + fileStatus.getPath().getName());
}
if (fileStatus.isDirectory()) {
System.out.println("目录名 = " + fileStatus.getPath().getName());
}
}
// 3.关闭资源、
fs.close();
}
3.7 HDFS的I/O流操作
上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢?
我们可以采用IO流的方式实现数据的上传和下载。
把本地文件banhua.txt上传到HDFS根目录
/**
* 把本地文件banhua.txt上传到HDFS根目录
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void putFileToHDFS() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2 创建输入流
FileInputStream fis = new FileInputStream(new File("E:/file/banhua.txt"));
// 3.创建输出流
FSDataOutputStream fos = fs.create(new Path("/banhua.txt"));
// 4.流拷贝
IOUtils.copyBytes(fis,fos,conf);
// 3.关闭资源、
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
从HDFS上下载banhua.txt文件到本地E盘上
/**
* 从HDFS上下载banhua.txt文件到本地E盘上
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void getFileFromHDFS() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2 创建输入流
FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
// 3.创建输出流
FileOutputStream fos = new FileOutputStream("E:/file/banhua.txt");
// 4.流拷贝
IOUtils.copyBytes(fis,fos,conf);
// 3.关闭资源、
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
分块读取HDFS上的大文件–下载第一部分
/**
* 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz
* 下载第一部分
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void readFileSeek1() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2 创建输入流
FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz"));
// 3.创建输出流
FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part1");
// 4.流拷贝
byte[] bytes = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fis.read(bytes);
fos.write(bytes);
}
// 3.关闭资源、
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
分块读取HDFS上的大文件–下载第二部分
/**
* 分块读取HDFS上的大文件,比如根目录下的/user/atguigu/input/hadoop-2.7.2.tar.gz
* 下载第二部分
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void readFileSeek2() throws IOException, URISyntaxException, InterruptedException {
// 1.获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "atguigu");
// 2 创建输入流
FSDataInputStream fis = fs.open(new Path("/user/atguigu/input/hadoop-2.7.2.tar.gz"));
// 3.创建输出流
FileOutputStream fos = new FileOutputStream("E:/file/hadoop-2.7.2.tar.gz.part2");
// 4.定位输入流数据位置
fis.seek(1024*1024*128);
// 5.流的对拷
IOUtils.copyBytes(fis,fos,conf);
// 6.关闭资源、
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
在Window命令窗口中进入到目录E:/file/,然后执行如下命令,对数据进行合并
hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz
,解压发现该tar包非常完整。
4. 问题总结
问题一
idea启动出现:WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform 警告
解决方案:在log4j.propertis中添加下面的内容。
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
问题二
出现: java.io.IOException: Could not locate executable
nullbinwinutils.exe
in the Hadoop binaries.解决方案:配置一下CLASSPATH,然后重启。
☆
最后
以上就是感性玫瑰为你收集整理的3. HDFS客户端操作的全部内容,希望文章能够帮你解决3. HDFS客户端操作所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复