我是靠谱客的博主 娇气画笔,最近开发中收集的这篇文章主要介绍Hadoop自带的一些程序示例,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、PiEstimator.java

位置:E:Hadoophadoop-0.20.1srcexamplesorgapachehadoopexamples

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.
See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.
The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.
You may obtain a copy of the License at
*
*
http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A Map-reduce program to estimate the value of Pi
* using quasi-Monte Carlo method.
*
* Mapper:
*
Generate points in a unit square
*
and then count points inside/outside of the inscribed circle of the square.
*
* Reducer:
*
Accumulate points inside/outside results from the mappers.
*
* Let numTotal = numInside + numOutside.
* The fraction numInside/numTotal is a rational approximation of
* the value (Area of the circle)/(Area of the square),
* where the area of the inscribed circle is Pi/4
* and the area of unit square is 1.
* Then, Pi is estimated value to be 4(numInside/numTotal).
*/
public class PiEstimator extends Configured implements Tool {
/** tmp directory for input/output */
static private final Path TMP_DIR = new Path(
PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
/** 2-dimensional Halton sequence {H(i)},
* where H(i) is a 2-dimensional point and i >= 1 is the index.
* Halton sequence is used to generate sample points for Pi estimation.
*/
private static class HaltonSequence {
/** Bases */
static final int[] P = {2, 3};
/** Maximum number of digits allowed */
static final int[] K = {63, 40};
private long index;
private double[] x;
private double[][] q;
private int[][] d;
/** Initialize to H(startindex),
* so the sequence begins with H(startindex+1).
*/
HaltonSequence(long startindex) {
index = startindex;
x = new double[K.length];
q = new double[K.length][];
d = new int[K.length][];
for(int i = 0; i < K.length; i++) {
q[i] = new double[K[i]];
d[i] = new int[K[i]];
}
for(int i = 0; i < K.length; i++) {
long k = index;
x[i] = 0;
for(int j = 0; j < K[i]; j++) {
q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i];
d[i][j] = (int)(k % P[i]);
k = (k - d[i][j])/P[i];
x[i] += d[i][j] * q[i][j];
}
}
}
/** Compute next point.
* Assume the current point is H(index).
* Compute H(index+1).
*
* @return a 2-dimensional point with coordinates in [0,1)^2
*/
double[] nextPoint() {
index++;
for(int i = 0; i < K.length; i++) {
for(int j = 0; j < K[i]; j++) {
d[i][j]++;
x[i] += q[i][j];
if (d[i][j] < P[i]) {
break;
}
d[i][j] = 0;
x[i] -= (j == 0? 1.0: q[i][j-1]);
}
}
return x;
}
}
/**
* Mapper class for Pi estimation.
* Generate points in a unit square
* and then count points inside/outside of the inscribed circle of the square.
*/
public static class PiMapper extends MapReduceBase
implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
/** Map method.
* @param offset samples starting from the (offset+1)th sample.
* @param size the number of samples for this map
* @param out output {ture->numInside, false->numOutside}
* @param reporter
*/
public void map(LongWritable offset,
LongWritable size,
OutputCollector<BooleanWritable, LongWritable> out,
Reporter reporter) throws IOException {
final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
long numInside = 0L;
long numOutside = 0L;
for(long i = 0; i < size.get(); ) {
//generate points in a unit square
final double[] point = haltonsequence.nextPoint();
//count points inside/outside of the inscribed circle of the square
final double x = point[0] - 0.5;
final double y = point[1] - 0.5;
if (x*x + y*y > 0.25) {
numOutside++;
} else {
numInside++;
}
//report status
i++;
if (i % 1000 == 0) {
reporter.setStatus("Generated " + i + " samples.");
}
}
//output map results
out.collect(new BooleanWritable(true), new LongWritable(numInside));
out.collect(new BooleanWritable(false), new LongWritable(numOutside));
}
}
/**
* Reducer class for Pi estimation.
* Accumulate points inside/outside results from the mappers.
*/
public static class PiReducer extends MapReduceBase
implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
private long numInside = 0;
private long numOutside = 0;
private JobConf conf; //configuration for accessing the file system
/** Store job configuration. */
@Override
public void configure(JobConf job) {
conf = job;
}
/**
* Accumulate number of points inside/outside results from the mappers.
* @param isInside Is the points inside?
* @param values An iterator to a list of point counts
* @param output dummy, not used here.
* @param reporter
*/
public void reduce(BooleanWritable isInside,
Iterator<LongWritable> values,
OutputCollector<WritableComparable<?>, Writable> output,
Reporter reporter) throws IOException {
if (isInside.get()) {
for(; values.hasNext(); numInside += values.next().get());
} else {
for(; values.hasNext(); numOutside += values.next().get());
}
}
/**
* Reduce task done, write output to a file.
*/
@Override
public void close() throws IOException {
//write output to a file
Path outDir = new Path(TMP_DIR, "out");
Path outFile = new Path(outDir, "reduce-out");
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, LongWritable.class, LongWritable.class,
CompressionType.NONE);
writer.append(new LongWritable(numInside), new LongWritable(numOutside));
writer.close();
}
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throws IOException {
//setup job conf
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobConf.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
final FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists.
Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, jobConf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR, true);
}
}
/**
* Parse arguments and then runs a map/reduce job.
* Print output in standard out.
*
* @return a non-zero if there is an error.
Otherwise, return 0.
*/
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
final int nMaps = Integer.parseInt(args[0]);
final long nSamples = Long.parseLong(args[1]);
System.out.println("Number of Maps
= " + nMaps);
System.out.println("Samples per Map = " + nSamples);
final JobConf jobConf = new JobConf(getConf(), getClass());
System.out.println("Estimated value of Pi is "
+ estimate(nMaps, nSamples, jobConf));
return 0;
}
/**
* main method for running it as a stand alone command.
*/
public static void main(String[] argv) throws Exception {
System.exit(ToolRunner.run(null, new PiEstimator(), argv));
}
}

二、MultiFileWordCount.java

位置:E:Hadoophadoop-0.20.1srcexamplesorgapachehadoopexamples

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.
See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.
The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.
You may obtain a copy of the License at
*
*
http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MultiFileInputFormat;
import org.apache.hadoop.mapred.MultiFileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* MultiFileWordCount is an example to demonstrate the usage of
* MultiFileInputFormat. This examples counts the occurrences of
* words in the text files under the given input directory.
*/
public class MultiFileWordCount extends Configured implements Tool {
/**
* This record keeps <filename,offset> pairs.
*/
public static class WordOffset implements WritableComparable {
private long offset;
private String fileName;
public void readFields(DataInput in) throws IOException {
this.offset = in.readLong();
this.fileName = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeLong(offset);
Text.writeString(out, fileName);
}
public int compareTo(Object o) {
WordOffset that = (WordOffset)o;
int f = this.fileName.compareTo(that.fileName);
if(f == 0) {
return (int)Math.signum((double)(this.offset - that.offset));
}
return f;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof WordOffset)
return this.compareTo(obj) == 0;
return false;
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; //an arbitrary constant
}
}
/**
* To use {@link MultiFileInputFormat}, one should extend it, to return a
* (custom) {@link RecordReader}. MultiFileInputFormat uses
* {@link MultiFileSplit}s.
*/
public static class MyInputFormat
extends MultiFileInputFormat<WordOffset, Text>
{
@Override
public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
, JobConf job, Reporter reporter) throws IOException {
return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
}
}
/**
* RecordReader is responsible from extracting records from the InputSplit.
* This record reader accepts a {@link MultiFileSplit}, which encapsulates several
* files, and no file is divided.
*/
public static class MultiFileLineRecordReader
implements RecordReader<WordOffset, Text> {
private MultiFileSplit split;
private long offset; //total offset read so far;
private long totLength;
private FileSystem fs;
private int count = 0;
private Path[] paths;
private FSDataInputStream currentStream;
private BufferedReader currentReader;
public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
throws IOException {
this.split = split;
fs = FileSystem.get(conf);
this.paths = split.getPaths();
this.totLength = split.getLength();
this.offset = 0;
//open the first file
Path file = paths[count];
currentStream = fs.open(file);
currentReader = new BufferedReader(new InputStreamReader(currentStream));
}
public void close() throws IOException { }
public long getPos() throws IOException {
long currentOffset = currentStream == null ? 0 : currentStream.getPos();
return offset + currentOffset;
}
public float getProgress() throws IOException {
return ((float)getPos()) / totLength;
}
public boolean next(WordOffset key, Text value) throws IOException {
if(count >= split.getNumPaths())
return false;
/* Read from file, fill in key and value, if we reach the end of file,
* then open the next file and continue from there until all files are
* consumed.
*/
String line;
do {
line = currentReader.readLine();
if(line == null) {
//close the file
currentReader.close();
offset += split.getLength(count);
if(++count >= split.getNumPaths()) //if we are done
return false;
//open a new file
Path file = paths[count];
currentStream = fs.open(file);
currentReader=new BufferedReader(new InputStreamReader(currentStream));
key.fileName = file.getName();
}
} while(line == null);
//update the key and value
key.offset = currentStream.getPos();
value.set(line);
return true;
}
public WordOffset createKey() {
WordOffset wo = new WordOffset();
wo.fileName = paths[0].toString(); //set as the first file
return wo;
}
public Text createValue() {
return new Text();
}
}
/**
* This Mapper is similar to the one in {@link WordCount.MapClass}.
*/
public static class MapClass extends MapReduceBase
implements Mapper<WordOffset, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(WordOffset key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
private void printUsage() {
System.out.println("Usage : multifilewc <input_dir> <output>" );
}
public int run(String[] args) throws Exception {
if(args.length < 2) {
printUsage();
return 1;
}
JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
job.setJobName("MultiFileWordCount");
//set the InputFormat of the job to our InputFormat
job.setInputFormat(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new MultiFileWordCount(), args);
System.exit(ret);
}
}


三、Python版WordCount.py

位置:E:Hadoophadoop-0.20.1srcexamplespython

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.
See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.
The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.
You may obtain a copy of the License at
#
#
http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class WordCountMap(Mapper, MapReduceBase):
one = IntWritable(1)
def map(self, key, value, output, reporter):
for w in value.toString().split():
output.collect(Text(w), self.one)
class Summer(Reducer, MapReduceBase):
def reduce(self, key, values, output, reporter):
sum = 0
while values.hasNext():
sum += values.next().get()
output.collect(key, IntWritable(sum))
def printUsage(code):
print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
sys.exit(code)
def main(args):
conf = JobConf(WordCountMap);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text);
conf.setOutputValueClass(IntWritable);
conf.setMapperClass(WordCountMap);
conf.setCombinerClass(Summer);
conf.setReducerClass(Summer);
try:
flags, other_args = getopt.getopt(args[1:], "m:r:")
except getopt.GetoptError:
printUsage(1)
if len(other_args) != 2:
printUsage(1)
for f,v in flags:
if f == "-m":
conf.setNumMapTasks(int(v))
elif f == "-r":
conf.setNumReduceTasks(int(v))
conf.setInputPath(Path(other_args[0]))
conf.setOutputPath(Path(other_args[1]))
JobClient.runJob(conf);
if __name__ == "__main__":
main(sys.argv)


最后

以上就是娇气画笔为你收集整理的Hadoop自带的一些程序示例的全部内容,希望文章能够帮你解决Hadoop自带的一些程序示例所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(36)

评论列表共有 0 条评论

立即
投稿
返回
顶部