我是靠谱客的博主 难过爆米花,这篇文章主要介绍SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务,现在分享给大家,希望可以做个参考。

SQOOP源码系列文章是把SQOOP源码详细、简单地介绍给大家,本系列文章分为3个部分,一是ToolRunner从Window本地提交MapReduce任务到HDFS,二是Sqoop从读取配置文件到提交任务的过程分析,三是Sqoop中Map切割表数据到导入表的过程。

导读:SQOOP通过生成的MapReduce向hadoop集群提交任务,然而这个过程是怎样的呢,我们通过Hadoop提供的WordCount来模拟实现这一个过程,在往后的文章会详细地分析这些过程。

1.WordCount实现

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package master.hadooptool; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } }

WordCount写好后,把其打包成wordcount.jar,放到本地

2.运行Tool实现
SqoopTool继承了Configured并且实现了Tool类,因此我们的运行工具类也需要实现。改类实现了以后,ToolRunner运行的时候,就会把本地的Jar包上传到HDFS上,不需要我们操作

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package master.hadooptool; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.PropertyConfigurator; public class WordCountTool extends Configured implements Tool { public static void main(String[] args) throws Exception { PropertyConfigurator.configure("log4j.properties"); System.setProperty("HADOOP_USER_NAME", "hadoop"); //加载Hadoop配置文件到Configuration中 String HADOOP_CONFS[] = { "core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hive2-site.xml", "hbase-site.xml" }; Configuration conf = new Configuration(); for (String name : HADOOP_CONFS) { File file = new File(name); if (!file.exists()) { continue; } FileInputStream in = null; try { in = new FileInputStream(file); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } conf.addResource(in); } WordCountTool tool = new WordCountTool(); tool.setConf(conf); //调用ToolRunner来运行文件 ToolRunner.run(tool.getConf(), tool, new String[] {}); } public int run(String[] args) throws Exception { Configuration conf=super.getConf(); conf.setBoolean("mapreduce.app-submission.cross-platform", true);//设置跨平台提交 conf.set("tmpjars", "file:/E:/hadoop/wordcount.jar"); // 加载wordcontjar文件,注意路径格式 // job.getConfiguration().set("mapred.jar","E:/hadoop/wordcount.jar"); //mapred.jar是MapReduce所在的文件,tmpjars是MapReduce依赖库,我们没有依赖库,选择其中一个就可以,注意路径格式 //构建Job Job job = Job.getInstance(conf, "wordcount"); job.setJarByClass(WordCountTool.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入输出路径,需要准备一个文本格式的文件放到HDFS中 FileInputFormat.addInputPath(job, new Path("/wordcount/wordcount.txt")); FileOutputFormat.setOutputPath(job, new Path("/wordcount/result")); int n=job.waitForCompletion(true)?0:-1; return n; } }

3.依赖库和配置文件
MavenDependency的pom.xml依赖

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.8.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>2.8.1</version> <scope>provided</scope> </dependency> </dependencies>

core-site.xml

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hadoopmaster:9000</value> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> <description>The FileSystem for hdfs: uris.</description> </property> <property> <name>hadoop.tmp.dir</name> <value>/soft/hadoop_data/tmp</value> </property> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hadoop.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hadoop.groups</name> <value>*</value> </property> </configuration>

mapred-site.xml

复制代码
1
2
3
4
5
6
7
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>

yarn-site.xml

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>hadoopmaster</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> <description>Whether virtual memory limits will be enforced for containers</description> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>4</value> <description>Ratio between virtual memory to physical memory when setting memory limits for containers</description> </property> </configuration>

在以上的配置文件中,需要把hadoopmaster改成自己Hadoop集群的IP

4.运行结果
在这里插入图片描述

5.总结
Hadoop提供ToolRunner远程向Hadoop集群提交MapReduce任务,在Job中配置好tmpjars和mapred.jar后,集群会把Jar上传到HDFS,然后运行任务。
在现实的操作中还会遇到很多问题,需要我们冷静地分析,然后解决问题,不断学习、不断进步。
最后,有什么问题可以给我留言。。。

最后

以上就是难过爆米花最近收集整理的关于SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务的全部内容,更多相关SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(114)

评论列表共有 0 条评论

立即
投稿
返回
顶部