我是靠谱客的博主 舒适草莓,最近开发中收集的这篇文章主要介绍hadoop 练习(1),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1 在hdfs上进行文件复制

        Configuration conf=new Configuration();
 conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/"),conf);
        InputStream in=null;
        OutputStream out=fs.create(new Path("hdfs://localhost/user/zh2.txt"));
        try{
            in=fs.open(new Path("hdfs://localhost/11.txt"));

            IOUtils.copyBytes(in, out, 4096,true);  //从hdfs上读取文件内容,然后输出到另一个文件中,复制
        }finally{
            IOUtils.closeStream(in);
        }

2 对FSDataInputStream类中的seek和getPos的探索

//对FSDataInputStream类中的seek和getPos的探索
 fs.delete(new Path("hdfs://localhost/11.txt"),false);
 newFile=fs.create(new Path("hdfs://localhost/11.txt"));
 newFile.write("我".getBytes());
 newFile.write("a".getBytes());
 newFile.flush();
 newFile.close();
 in=fs.open(new Path("hdfs://localhost/11.txt"));
 System.out.println(in.getPos());   //0
 IOUtils.copyBytes(in, System.out, 4096, false);   //我
 System.out.println(in.getPos());    //4
 System.out.println("***********");
 in.seek(3);//a
 IOUtils.copyBytes(in, System.out, 4096,false);

可以看出getPos()获得当前文件offset距文件开头的长度(单位是字节),如果对文件进行读取,那么就有偏移量。seek()表示将文件offset定位在第几个字节后。

3 将文件从dfs上拷贝到本地

Configuration conf=new Configuration();
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER"); 
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true"); 
        conf.setInt("dfs.replication", 1);

        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/11.txt"),conf);
        OutputStream   out=new FileOutputStream(new File("C:/Users/lei/Desktop/11.txt"));
        //将文件从dfs上拷贝到本地
        FSDataInputStream in=fs.open(new Path("hdfs://localhost/11.txt"));
        try{
             IOUtils.copyBytes(in, out,1024, false); 
        }finally{
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }

4.将文件从本地上拷贝到dfs上

    Configuration conf=new Configuration();
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER"); 
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true"); 
        conf.setInt("dfs.replication", 1);

        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/11.txt"),conf);
        InputStream   in=new FileInputStream(new File("C:/Users/lei/Desktop/11.txt"));
        //将文件从本地拷贝到DFS上
        FSDataOutputStream out=fs.create(new Path("hdfs://localhost/12.txt"));
        try{
             IOUtils.copyBytes(in, out,1024, false); 
        }finally{
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }

4.新建一个文件夹


        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost"),conf);



        try{

            System.out.println(fs.mkdirs(new Path("hdfs://localhost/Hadoop_test")));
        }finally{
        }

5 文件或者文件夹的相关属性

FileSystem fs=FileSystem.get(URI.create("hdfs://localhost"),conf);
Path dir=new Path("/");
FileStatus stat=fs.getFileStatus(dir);
System.out.println(stat.getPath().toUri().getPath());
System.out.println(stat.isDirectory());
System.out.println(stat.getLen());
System.out.println(stat.getPermission().toString());
//将根目录下的所有文件文件夹的属性进行遍历,不进行递归
Path dir=new Path("/");
            FileStatus stat=fs.getFileStatus(dir);
            FileStatus[] statList=fs.listStatus(new Path("/zhanglei"));
            for(FileStatus fileStatus:statList){
                System.out.println(fileStatus.toString());
            }

6 遍历文件系统,并对文件进行筛选,使用FileSystem中的listFiles方法得到一个遍历器,然后筛选出是文件的,并且符合正则表达式的文件

FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/user"),conf);
        try{
            RemoteIterator<LocatedFileStatus> fileIterator=fs.listFiles(new Path("/"), true);
            while(fileIterator.hasNext()){
                Path path=fileIterator.next().getPath();
                FileStatus stat=fs.getFileStatus(path);
                if(stat.isFile()&&stat.getPath().toUri().getPath().matches("/user/.*")){
                    System.out.println(stat.getPath().toUri().getPath());
                }
            }
        }finally{
        }

7 如果非递归遍历,可以用FileSystem中的listStatus方法

FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/user"),conf);
        try{
            FileStatus[]  fileS=fs.globStatus(new Path("/*/*"),new RegexExcludePathFilter(".*/zhanglei/.*"));
            for(FileStatus fileStatus:fileS){
                System.out.println(fileStatus.getPath().toString());
                System.out.println(fileStatus.getPath().toUri().getPath());
            }
        }finally{
        }

class RegexExcludePathFilter implements PathFilter {
    private final String regex;
    public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }
    public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
}

globStatus方法的第一个参数是glob character ,可以写的格式有
这里写图片描述
这里写图片描述
,同时globstatus提供一个过滤接口,可以实现这个接口,例如RegexExcludePathFilter,进行进一步过滤。
8.文件压缩

FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/user"),conf);
        InputStream input=null;
        OutputStream output=null;
        CompressionOutputStream out=null;
        //压缩文件
        try{
            fs.delete(new Path("/1901ompress"),false);
            input=fs.open(new Path("/1901"));
            output=fs.create(new Path("/1901compress"));
            CompressionCodec codec=(CompressionCodec) ReflectionUtils.newInstance(org.apache.hadoop.io.compress.GzipCodec.class, conf);
            out=codec.createOutputStream(output);
            IOUtils.copyBytes(input, out, 4096,false);
        }finally{
            IOUtils.closeStream(input);
            IOUtils.closeStream(out);
            IOUtils.closeStream(output);
        }
    }

9.文件解压

        Configuration conf=new Configuration();
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER"); 
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true"); 
        conf.setInt("dfs.replication", 1);

        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/user"),conf);
        InputStream input=null;
        OutputStream output=null;
        CompressionOutputStream out=null;
        CompressionInputStream in=null;
        //解压文件
        try{
            fs.delete(new Path("/1901decompress"), false);
            input=fs.open(new Path("/1901compress"));
            output=fs.create(new Path("/1901decompress"));
            CompressionCodec codec=(CompressionCodec) ReflectionUtils.newInstance(org.apache.hadoop.io.compress.GzipCodec.class, conf);
            in=codec.createInputStream(input);
            IOUtils.copyBytes(in, output, 4096,false);
        }finally{
            IOUtils.closeStream(input);
            IOUtils.closeStream(out);
            IOUtils.closeStream(output);
            IOUtils.closeStream(in);
        }
}

这里写图片描述

10 CodecPool,如果需要多次的压缩和解压,那么可以使用CodecPool来重复使用compressors 和decompressors

FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/user"),conf);
        InputStream input=null;
        OutputStream output=null;
        CompressionOutputStream out=null;
        CompressionInputStream in=null;
        Compressor compressor=null
        //解压文件
        try{
            fs.delete(new Path("/1901compress"),false);
            input=fs.open(new Path("/1901"));
            output=fs.create(new Path("/1901compress"));
            CompressionCodec codec=(CompressionCodec) ReflectionUtils.newInstance(org.apache.hadoop.io.compress.Lz4Codec.class, conf);
            compressor=CodecPool.getCompressor(codec);
            out=codec.createOutputStream(output,compressor);
            IOUtils.copyBytes(input, out, 4096,false);
        }finally{
            IOUtils.closeStream(input);
            IOUtils.closeStream(out);
            IOUtils.closeStream(output);
            IOUtils.closeStream(in);
            CodecPool.returnCompressor(compressor);
        }

11.针对mapreduce的输出,如果需要压缩,可以由两种方法:
通过configuration 设定mapreduce.output.fileoutputformat.compress :true
和duce.output.fileoutputformat.compress.codec: GzipCodec.class
或者说使用

FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

同样针对map的输出,如果需要压缩,使用在Configuration来设定

conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,
CompressionCodec.class);

12 writable 接口

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}

这里写图片描述
这些实现了writable的类,需要有两个功能write和readFields功能,也就是需要举个例子,IntWritable的write就是把一个整数写入输出流中,同时可以通过readField从输入流in中读取一个整数

public static void main( String[] args )  throws Exception
    {
        IntWritable writable=new IntWritable();
        writable.set(163);
        byte[]  bytes=serialize(writable);
        IntWritable newWritable=new IntWritable();
        deserialize(newWritable, bytes);
        System.out.println(newWritable.get());
        System.out.println(bytes.length);
    }
    public static byte[] serialize(Writable writable) throws IOException{
        ByteArrayOutputStream  out=new ByteArrayOutputStream();
        DataOutputStream dataOut=new DataOutputStream(out);
        writable.write(dataOut);
        dataOut.close();
        return out.toByteArray();
    }
    public static byte[]  deserialize(Writable writable,byte[] bytes) throws IOException{
        ByteArrayInputStream in=new ByteArrayInputStream(bytes);
        DataInputStream dataIn=new DataInputStream(in);
        writable.readFields(dataIn);
        dataIn.close();
        return bytes;
    }

13 WritablComparable and comparators

package org.apache.hadoop.io;
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T>{
  int  compare(T o1,T o2);
  boolean equals(Object obj);
}
public interface  Comparable<T>{
int  compareTo(T o);

}

WritableComparator 是对RawComparator的一般化实现,提供的功能如下

public static void main(String[] args)  throws Exception{
        //功能1,提供一个comparator工厂
        RawComparator<IntWritable> comparator=WritableComparator.get(IntWritable.class);
        IntWritable w1=new IntWritable(163);
        IntWritable w2=new IntWritable(67);
        //功能2 提供比较策略
        System.out.println(comparator.compare(w1, w2));
        byte[] b1=serialize(w1);
        byte[] b2=serialize(w2);
        System.out.println(comparator.compare(b1,0,b1.length,b2,0,b2.length));
    }
    public static byte[] serialize(Writable writable) throws IOException{
        ByteArrayOutputStream  out=new ByteArrayOutputStream();
        DataOutputStream dataOut=new DataOutputStream(out);
        writable.write(dataOut);
        dataOut.close();
        return out.toByteArray();
    }

14 String和Text的区别
主要是针对String的length方法,charAt方法,indexOf方法,已经Text类与之对应的getlength方法,charAt方法,find方法.
String:length ,Text:getlength
String 是以char字符单元计算长度,char字符单元可以是Ascii码,或者是一个中文。而text是以char字符单元对应的二进制表示的字节数来计算长度的,由于text是UTF-8编码的,所以中文占3个字节。

Text text=new Text("含");
String s="含";
System.out.println(text.getLength());  //3
System.out.println(s.length());  //1

String:indexOf,Text:find
String的indexOf表示的是char字符单元在String字符串中的位置,位置的计算是以字符单元来进行的。Text的find表示的是char字符单元在text对应的二进制编码中的字节的位置,位置计算是以字节来进行的。

Text text=new Text("含a");
String s="含a";
System.out.println(text.find("a"));   //3
System.out.println(s.indexOf("a"));   //1

String:charAt,Text:charAt
String的charAt表示表示字符串中对应位置的字符是什么,位置计算是以字符单元进行的,Text的charAt表示对应位置的字符是什么,位置计算是以字节进行的,返回的是10进制表示的Unicode scalar value.常见的还是他对应的16进制表示.

Text text=new Text("含a");  
String s="含a";  
System.out.println(text.charAt(0));  //21547
System.out.println(Integer.toHexString(text.charAt(0))); //542b
System.out.println(text.charAt(1));  //-1
System.out.println(text.charAt(3));  //97
System.out.println(s.charAt(0));  //含
System.out.println(s.charAt(1));   //a

text的字符遍历

Text text=new Text("含有a");
ByteBuffer buf=ByteBuffer.wrap(text.getBytes(),0,text.getLength());
int cp;
while(buf.hasRemaining() && (cp=Text.bytesToCodePoint(buf))!=-1){
            System.out.println(Integer.toHexString(cp));
}

15.BytesWritable
针对byte类型的数组进行的封装。序列化格式为4个字节表示这个数组一共有多少个元素(字节),然后后面跟着相应的元素。

public static void main(String[] args)  throws Exception{
        BytesWritable b=new BytesWritable(new byte[]{3,5});
        System.out.println(b.getLength());  //2 返回数组的元素个数
        byte[] bytes=serialize(b);      
        System.out.println(bytes.length);    //6
        System.out.println(StringUtils.byteToHexString(bytes));//000000020305
    }
    public static byte[] serialize(Writable writable) throws IOException{
        ByteArrayOutputStream  out=new ByteArrayOutputStream();
        DataOutputStream dataOut=new DataOutputStream(out);
        writable.write(dataOut);
        dataOut.close();
        return out.toByteArray();
    }

16.Writable collection
ArrayWritable and TwoDArrayWritable:元素是Writable类型的数组,一个是一维的,一个是二维的. ArrayPrimitiveWritable :元素是Java 基本数据类型的数组. MapWritable :实现Map

MapWritable src=new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));
System.out.println(((Text)src.get(new IntWritable(1))).charAt(0)); //99
System.out.println(((LongWritable) src.get(new VIntWritable(2))).get()); //163

17.writableComparator 和WritableComparable
MapReduce的过程就是不断的传递键值对,在传递过程中,需要对键进行排序,hadoop要求键是可序列化的。因此在hadoop的各种writable子类中,都需要实现writableComarable接口,以便MapReduce程序可以进行键的比较。而writableComparable进行比较时需要进行反序列化,然后才开始比较。这会增加计算复杂度。因此hadoop提供了一个WritableComparator,他进行比较时,可以不用反序列化,因此可以人为的指定键比较时所用的比较器,而不用writableComparable中的CompareTo方法。WritableComparator是实现了RawComparator接口。如果我们构造的新数据类型使用了writable的子类,那么就可以直接继承WritableComparator,使用现成的一些代码。而不用直接实现RawComparator接口。
举例如下:

public class TextPair  implements WritableComparable<TextPair>{
    private Text first;
    private Text second;
   public int compareTo(TextPair tp){
       int cmp=first.compareTo(tp.first);
       if(cmp!=0)
           return cmp;
       return second.compareTo(tp.second);
   }

    public static void main(String[] args)  throws Exception{
        TextPair textPair1=new TextPair("cat","dog");
        TextPair textPair2=new TextPair("apple","pine");
        WritableComparator textPairComparator=WritableComparator.get(TextPair.class);
        System.out.println(textPairComparator.compare(textPair2, textPair1));
        TextPair.Comparator textPairComparator2=new TextPair.Comparator();
        System.out.println(textPairComparator2.compare(textPair2, textPair1));
    }
    public static byte[] serialize(Writable writable) throws IOException{


    public static class Comparator extends WritableComparator{
        private static final Text.Comparator TEXT_COMPARATOR =new Text.Comparator();
        public Comparator(){
            super(TextPair.class);
        }
        public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){
            try{
                int firstL1=WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1);
                int firstL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);
                int cmp=TEXT_COMPARATOR.compare(b1, s1,firstL1,b2,s2,firstL2);
                if(cmp!=0){
                    return cmp;
                }
                return TEXT_COMPARATOR.compare(b1, s1+firstL1,l1-firstL1,b2,s2+firstL2,l2-firstL2);
            }catch(IOException e){
                throw new IllegalArgumentException(e);
            }
        }
        static{
            WritableComparator.define(TextPair.class, new Comparator());
        }
    }
}

为了简化起见,我们只给出了compareTo方法。如果我们需要重新定义一个数据类型用作键,那么它应该实现WritableComparable的compareTo方法。当然还可以自定义比较器,也就是内部静态类Comparator.static中的代码

static{WritableComparator.define(TextPair.class, new Comparator());}

是在WritableComparator中注册TextPair.class的比较器。然后我们就可以使用
WritableComparator textPairComparator=WritableComparator.get(TextPair.class);
得到一个比较器。当然也可以通过

TextPair.Comparator textPairComparator2=new TextPair.Comparator();

这种方法得到一个比较器。
其次在Comparator 类compare方法中
WritableUtils.decodeVIntSize(b1[s1])用来指出b1的字符串对应byte数组长度。原因是b1是Text类型,在序列化是首先会指定字符串的编码字节长度,然后指定字符串的编码字节。readVInt(b1, s1)用来指定b1是用几个字节存放字符串的长度的。
那么在MapReduce程序中可以通过如下的方法指定键比较器

job.setSortComparatorClass(TextPair.Comparator.class);

18 . 序列化文件
将一个writable类型的键值对写入一个序列化文件中

public class SequencefileWriteDemo {
    private static final String[] DATA={
            "one,two,buckle my shoe",
            "three,four shut the door",
            "five ,six,pick up sticks",
            "seven,eight,lay them Straight"
    };
    public static void main(String[] args) throws IOException{

        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create("hdfs://localhost/"),conf);
        Path path=new Path("/user/number.seq");
        IntWritable key =new IntWritable();
        Text value=new Text();
        SequenceFile.Writer writer=null;
        try{
            writer=SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass() );
            for(int i=0;i<100;i++){
                key.set(100-i);
                value.set(DATA[i%DATA.length]);
                System.out.printf("[%s]t%st%sn", writer.getLength(), key, value);
                writer.append(key, value);
            }
        }finally{
            IOUtils.closeStream(writer);
        }
    }
}

输出结果:

[128]   100 one,two,buckle my shoe
[171]   99  three,four shut the door
[216]   98  five ,six,pick up sticks
[259]   97  seven,eight,lay them Straight
[307]   96  one,two,buckle my shoe
[350]   95  three,four shut the door
[395]   94  five ,six,pick up sticks
[438]   93  seven,eight,lay them Straight

方括号表示的是相对于文件头的字节偏移量。首先文件头会包换一些这样的信息:
这里写图片描述
这里写图片描述

从序列化文件中读取键值对数据

Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create("hdfs://localhost"),conf);
Path  path=new Path("hdfs://localhost/user/number.seq");
SequenceFile.Reader reader=null;
try{
    reader =new SequenceFile.Reader(fs, path,conf);
    Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    Writable value=(Writable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
    long position =reader.getPosition();
    while(reader.next(key,value)){
    System.out.printf("[%s%s]t%st%sn", position, syncSeen, key, value);
    position=reader.getPosition();
    }
    }finally{
          IOUtils.closeStream(reader);
      }
    }

19 . hadoop的Configuration相关(1)
在启动new Configuration()时,默认加载两个配置文件core-default.xml, core-site.xml,这可可以通过

Configuration conf=new Configuration();
System.out.println(conf.toString());

查看。
如果希望添加自己的相关property可以直接通过

conf.set("apple", "1");

来设定,如果希望添加的property比较多,可以放在一个properties.xml文件中。然后通过

conf.addResource(URI.create(properties.xml详细的路径信息).toURI().toURL())

可以使用java的System.setProperty来设定一些property值。举个例子

conf.set("apples", "${apple}");
System.setProperty("apple", "2");
System.out.println(conf.get("apples"));  //2

首先现在conf中定义一个property,然后使用System.setProperty来设定apple的值,最终通过
System.out.println(conf.get(“apples”))输出值。这也可以在properties.xml中定义apples,但是不直接给出其值而是用${apple}代替。

<?xml version="1.0"?>
<configuration>
<property>
<name>apples</name>
<value>${apple}</value>
</property>
</configuration>

20.hadoop input Format
hadoop为了处理不同的数据源(从文本数据,二进制数据,到数据库数据等等)给出了InputFormat类
1.处理split问. InputFormat需要处理Input splits 问题,也就是针对数据源进行分片处理,以便不同的map处理一个数据片。这个在hadoop 中的抽象就是InputSplit类。

public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException,
InterruptedException;
}
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
public abstract RecordReader<K, V>
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException;
}

首先客户端运行作业,通过方法getSplits得到针对这个作业的splits(注意这是在客户端完成的),注意返回的是InputSplit列表,也就是InputSplit类的一个对象对应一个split,其中包括了这个split所在的节点(通过getLocations方法得到,由于一个切片可能包含多个数据块 ,所以可能一个split在多个节点上,因此getLocations返回一个字符串数组)。InputSplit类中的getLength方法显然返回的是这个split的大小(字节多少)。
然后将这些InputSplits发送到application master上,application master
根据每哥split中存储的位置信息,将相应的任务分发到相应的节点上。
当一个节点接到一个任务之后,会使用InputFormat类中的createRecordReader方法将split转换成RecordReader,类似一个迭代器,然后传给Mapper类中的run函数。

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}

注意mapper类中的run方法是使用context.nextkeyvalue方法,但是context对象中是包含RecordReader的,其实是间接的调用recordReader的nextKeyValue方法。
2.FileInputFormat input paths,
FileInputFormat 有两个功能,首先提供作业所需的文件,以及实现了对这些文件的分片(split)。如何将分片转化成RecordReader 由他的子类实现。也就是说FileInputFormat是抽象类。在指定作业的inputFormat的时候是直接使用FileInputformat.class,只能使用它的子类,

接下来一一列举 。首先就是FileInputFormat 的添加输入文件路径。有如下4中方法:

public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)

但是如果有多个小文件(文件大小小于128M),就可以使用CombineFileInputFormat,其好处就是可以将小文件集中到一个split中,这样更有利于hadoop的处理。因为hadoop处理100个10M左右的小文件,和hadoop处理10个100兆的,默认生成的split是不一样的,前者是生成100个split,后者是生成10个split。同时前者需要在硬盘上进行多次的查询,这样传输时间和读取时间就会变小,这样hadoop性能更低。
这样的话,建议将多个小文件合并在一起,一种方法就是将多个小文件合并到一个sequence文件中。

3.当然,我们也可以自己实现自己的inputFormat.最关键的地方就是要实现createRecordReader()方法。这里借用hadoop权威指南中的一个例子来说明

package com.zhanglei.MapReduce4;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<NullWritable,BytesWritable>{
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value=new BytesWritable();
    private boolean processed =false;

    @Override
    public void initialize(InputSplit split,TaskAttemptContext context)  throws IOException,InterruptedException{
        this.fileSplit=(FileSplit)split;
        this.conf=context.getConfiguration();
    }
    @Override
    public boolean nextKeyValue()  throws IOException,InterruptedException{
        if(!processed){

            byte[] contents=new byte[(int) fileSplit.getLength()];
            Path file=fileSplit.getPath();
            FileSystem fs=file.getFileSystem(conf);
            FSDataInputStream in =null;
            try{
                in=fs.open(file);

                IOUtils.readFully(in, contents,0,contents.length);
                value.set(contents,0,contents.length);
            }finally{
                IOUtils.closeStream(in);
            }
            processed =true;
            return true;
        }
        return false;
    }

    @Override  
    public NullWritable getCurrentKey() throws IOException ,InterruptedException{
        return NullWritable.get();
    }


    @Override
    public  BytesWritable getCurrentValue() throws IOException,InterruptedException{
        return value;
    }
    @Override 
    public float getProgress() throws IOException{
        return  processed?1.0f:0.0f;
    }
    @Override  
    public void close() throws IOException{

    }
}
package com.zhanglei.MapReduce4;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
    @Override
    protected boolean isSplitable(JobContext context,Path file){
        return false;
    }
    @Override
    public RecordReader<NullWritable,BytesWritable>  createRecordReader(
               InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException
    {
        WholeFileRecordReader reader=new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

wholeFileInputFormat这个类继承FileInputFormat <NullWritable,BytesWritable> <script type="math/tex" id="MathJax-Element-1"> </script>,这里说明Mapper类中的map方法接受的键类型为NullWritable ,值类型为BytesWritable。 其中的isSplitable()方法返回false表示文件不可被切分,这个inputFormat的目的是将一个文件内容当做一个值。CreateRecordReader方法的目的在于根据InputSplit生成相应的迭代器RecordReader <NullWritable,BytesWritable> <script type="math/tex" id="MathJax-Element-2"> </script>,而InputSplit是由父类FileInputFormat <NullWritable,BytesWritable> <script type="math/tex" id="MathJax-Element-3"> </script>实现。这里可以不用关心。
WholeFileRecordReader 继承RecordReader <NullWritable,BytesWritable> <script type="math/tex" id="MathJax-Element-4"> </script>,而RecordReader中有6个抽象方法,必须实现,分别是
close() ,getCurrentKey(),getCurrentvalue(),getProgress(),initialize(),nextKeyValue()。

  1. 数据从原始的文件开始,到结束,都需要经过哪些步骤。
    作业的提交是在客户端完成的。在作业提交之前,客户端需要做如下几件事情:
    1. 向资源管理器请求作业ID
    2. 检查作业输入和输出,确保输出目录是不存在的。否则报错。 针对作业的输入,需要针对输入文件进行分片。
    3. 将运行作业需要的配置文件,jar文件,以及分片信息保存在HDFS中的以作业ID命名的文佳夹中,以便各个节点复制
    4. 资源管理器接受到客户端提交的任务。启动资源调度器,选择一个节点管理器节点,启动一个容器,然后再容器中启动application master程序。
    5. Application master启动之后,需要针对每个分片生成map任务,以及生成reduce任务。如果作业大小比较小的话,map任务和reduce任务将和application master在同一个JVM中运行。否则application master想资源管理器请求容器,资源管理器选择一个节点管理器,启动一个容器,然后在这个容器中运行map或者reduce任务。

细化:
1. 分片
输入文件的分片是有InputFormat类来控制的。InputFormat除去实现分片之外,还确定了map任务输入的键和值类型,因为它是一个泛化类型。首先InputFormat类中的getSplit根据JodContext来生成一个Inpursplit列表,InputSplit类中包含两个信息,分片大小已经分片的位置。当需要启动一个Map任务的时候,调用InputFormat类中的createRecordReader方法,根据inputSplit信息生成一个RecordReader迭代器。然后将这些信息封装在Mapper.context中。当运行mapper类对象的run方法时,使用recordReader迭代访问分片的每条记录。
Shuffle和排序
这里是如何将map的输出结果传递给reduce任务。针对一个map任务,输出结果首先会存储在内存中,如果超过一定阈值,需要把这些输出结果写到硬盘上,在写在硬盘之前需要对内存中的输出结果根据最终要传的reduce进行分区,然后再每个分区中进行排序。这样每个map任务都会生成若干的spill文件(由于大小超过而被溢出到硬盘中),每个spill文件都是已经分区并排好序的。那么在这些spill文件写到硬盘之前,如果有combine任务,就在这时进行,然后将combine任务的输出结果写到硬盘上。这样每个map任务都会生成若干个spill文件,然后将这些spill文件合并成一个文件,这个文件已经分好区,并在区内排好序。接下来,每个reduce复制每个map任务的输出结果中相应区中文件,注意每个reduce对应着不同的区。所以数据也是不一样的。这便是reduce的复制阶段然后进入融合阶段,由于一个reduce任务获得了所有的map上的确定分区的数据,需要将这些文件进行融合(保持顺序),如何并不一定如何成一个文件,可能融合成多个文件之后,就进入reduce阶段。

22 通常情况下,由reduce函数处理的partition内部是根据键排好序的,但是不同的partition的键的顺序并没有排好。我们通常需要在job对象中设定partition类,也就是job.setPartitionerClass(TotalOrderPartitioner.class),从而确定需要获得的reduce任务的数量。hadoop默认使用hashpartition,但是这里可以使用不同的partition类,如果,我们需要对不同的partition的键也排好序,那么在进行partition的时候,就需要对key值分布进行分析,从而选出若干的截点,进行partition类的构造。TotalOrderPartition类便是是想了全局排序。使用全局排序,需要给出partitionFile,而partitionfile的生成可以通过InputSampler的writePartitionFile生成,也就是

        InputSampler.Sampler<LongWritable, Text> sampler=new InputSampler.RandomSampler(0.1, 10000, 10);
        InputSampler.writePartitionFile(job, sampler);

InputSampler.writePartitionFile将生成一个名为_partition.lst文件。然后调用

String partitionFile=TotalOrderPartitioner.getPartitionFile(conf);

获得该文件的位置信息,将相应的文件添加到共享文件系统中。

    InputSampler.Sampler<LongWritable, Text> sampler=new InputSampler.RandomSampler(0.1, 10000, 10);
    InputSampler.writePartitionFile(job, sampler);
  1. 通过secureCRT从本地上传文件到hadoop集群中。
    首先执行rz命令,CRT会弹出一个对话框,然后再对话框中选择需要上传的文件。完成第一步,将制定文件上传到服务器,然后 执行 hadoop fs -copyFromLocal 命令,将文件从服务器上传至hdfs.注意对于hdfs来说服务器才是本地文件系统。

最后

以上就是舒适草莓为你收集整理的hadoop 练习(1)的全部内容,希望文章能够帮你解决hadoop 练习(1)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部