概述
1.指定多个输入
在单个操作中处理一批文件,这是很常见的需求。比如说处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表达式中使用通配符在匹配多个文件时比较方便的,无需列举每个文件和目录来指定输入。hadoop为执行通配提供了两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throw IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException
PS:
globStatus()方法返回与路径想匹配的所有文件的FileStatus对象数组,并按路径排序。hadoop所支持的通配符与Unix bash相同。
第二个方法传了一个PathFilter对象作为参数,PathFilter可以进一步对匹配进行限制。PathFilter是一个接口,里面只有一个方法accept(Path path)。
PathFilter实例
RegexExcludePathFilter.java
class RegexExcludePathFilter implements PathFilter{
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
PS:该类实现了PathFilter接口,重写了accept方法
使用这个过滤器:
//通配符的使用
public static void list() throws IOException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//PathFilter是过滤布符合置顶表达式的路径,下列就是把以txt结尾的过滤掉
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
//FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
如果没有过滤器,
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
则输出结果如下:
hdfs://master:9000/user/hadoop/test/a.txt
hdfs://master:9000/user/hadoop/test/b.txt
hdfs://master:9000/user/hadoop/test/c.aaa
hdfs://master:9000/user/hadoop/test/c.txt
hdfs://master:9000/user/hadoop/test/cc.aaa
如果使用了过滤器
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
则输出结果如下:
hdfs://master:9000/user/hadoop/test/c.aaa
hdfs://master:9000/user/hadoop/test/cc.aaa
由此可见,PathFilter就是在匹配前面条件之后再加以限制,将匹配PathFilter的路径去除掉。
其实由accept方法里面的
return !path.toString().matches(regex);
可以看出来,就是将匹配的全部去除掉,如果改为
return path.toString().matches(regex);
就是将匹配regex的Path输出,将不匹配的去除。
PathFilter实例2
public int run(String[] args) throws Exception {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
//通过过滤器过滤掉不要的文件
FileStatus[] status = fs.globStatus(new Path(args[0]),new RegexExcludePathFilter(".*txt"));
Path[] listedPaths = FileUtil.stat2Paths(status);
job.setJarByClass(this.getClass());
job.setJobName("SumStepByTool");
job.setInputFormatClass(TextInputFormat.class); //这个是默认的输入格式
job.setMapperClass(SumStepByToolMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBeanMy.class);
job.setReducerClass(SumStepByToolReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBeanMy.class);
//job.setNumReduceTasks(3);
//对不同的输入文件使用不同的Mapper进行处理
// MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, SumStepByToolMapper.class);
// MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, SumStepByToolWithCommaMapper.class);
FileInputFormat.setInputPaths(job, listedPaths);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0:-1;
}
最后
以上就是清秀唇膏为你收集整理的hadoop 之 PathFilter -- 输入文件过滤器的全部内容,希望文章能够帮你解决hadoop 之 PathFilter -- 输入文件过滤器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复