概述
SQOOP源码系列文章是把SQOOP源码详细、简单地介绍给大家,本系列文章分为3个部分,一是ToolRunner从Window本地提交MapReduce任务到HDFS,二是Sqoop从读取配置文件到提交任务的过程分析,三是Sqoop中Map切割表数据到导入表的过程。
导读:SQOOP通过生成的MapReduce向hadoop集群提交任务,然而这个过程是怎样的呢,我们通过Hadoop提供的WordCount来模拟实现这一个过程,在往后的文章会详细地分析这些过程。
1.WordCount实现
package master.hadooptool;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
WordCount写好后,把其打包成wordcount.jar,放到本地
2.运行Tool实现
SqoopTool继承了Configured并且实现了Tool类,因此我们的运行工具类也需要实现。改类实现了以后,ToolRunner运行的时候,就会把本地的Jar包上传到HDFS上,不需要我们操作
package master.hadooptool;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;
public class WordCountTool extends Configured implements Tool {
public static void main(String[] args) throws Exception {
PropertyConfigurator.configure("log4j.properties");
System.setProperty("HADOOP_USER_NAME", "hadoop");
//加载Hadoop配置文件到Configuration中
String HADOOP_CONFS[] = { "core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml",
"hive2-site.xml", "hbase-site.xml" };
Configuration conf = new Configuration();
for (String name : HADOOP_CONFS) {
File file = new File(name);
if (!file.exists()) {
continue;
}
FileInputStream in = null;
try {
in = new FileInputStream(file);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
conf.addResource(in);
}
WordCountTool tool = new WordCountTool();
tool.setConf(conf);
//调用ToolRunner来运行文件
ToolRunner.run(tool.getConf(), tool, new String[] {});
}
public int run(String[] args) throws Exception {
Configuration conf=super.getConf();
conf.setBoolean("mapreduce.app-submission.cross-platform", true);//设置跨平台提交
conf.set("tmpjars", "file:/E:/hadoop/wordcount.jar"); // 加载wordcontjar文件,注意路径格式
// job.getConfiguration().set("mapred.jar","E:/hadoop/wordcount.jar");
//mapred.jar是MapReduce所在的文件,tmpjars是MapReduce依赖库,我们没有依赖库,选择其中一个就可以,注意路径格式
//构建Job
Job job = Job.getInstance(conf, "wordcount");
job.setJarByClass(WordCountTool.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出路径,需要准备一个文本格式的文件放到HDFS中
FileInputFormat.addInputPath(job, new Path("/wordcount/wordcount.txt"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/result"));
int n=job.waitForCompletion(true)?0:-1;
return n;
}
}
3.依赖库和配置文件
MavenDependency的pom.xml依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>2.8.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoopmaster:9000</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/soft/hadoop_data/tmp</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoopmaster</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
<description>Whether virtual memory limits will be enforced for containers</description>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>4</value>
<description>Ratio between virtual memory to physical memory when setting memory limits for containers</description>
</property>
</configuration>
在以上的配置文件中,需要把hadoopmaster改成自己Hadoop集群的IP
4.运行结果
5.总结
Hadoop提供ToolRunner远程向Hadoop集群提交MapReduce任务,在Job中配置好tmpjars和mapred.jar后,集群会把Jar上传到HDFS,然后运行任务。
在现实的操作中还会遇到很多问题,需要我们冷静地分析,然后解决问题,不断学习、不断进步。
最后,有什么问题可以给我留言。。。
最后
以上就是难过爆米花为你收集整理的SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务的全部内容,希望文章能够帮你解决SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复