我是靠谱客的博主 包容洋葱,最近开发中收集的这篇文章主要介绍使用MapReduce对数据文件进行切分,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 

有一个格式化的数据文件,用t分割列,第2列为产品名称。现在需求把数据文件根据产品名切分为多个文件,使用MapReduce程序要如何实现?

原始文件:

[root@localhost opt]# cat aprData

1       a1      a111

2       a2      a211

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

7       a2      a112

8       a2      a112

9       a2      a112

10      a3      a113

 

思路:

1.用一个mapreduce程序找出所有产品名称:

1.1map<k2,v2>为<产品名称,null>

1.2reduce<k3,v3>为<产品名称,null>

   实现:AprProduces类

[root@localhost opt]# hadoop jar apr-produces.jar /aprData /aprProduce-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 15:00:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 15:00:12 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 15:00:12 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 15:00:12 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 15:00:13 INFO mapred.JobClient: Running job: job_201605010048_0020

16/05/01 15:00:14 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 15:00:33 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 15:00:45 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 15:00:50 INFO mapred.JobClient: Job complete: job_201605010048_0020

16/05/01 15:00:50 INFO mapred.JobClient: Counters: 29

16/05/01 15:00:50 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 15:00:50 INFO mapred.JobClient:     Spilled Records=20

16/05/01 15:00:50 INFO mapred.JobClient:     Map output materialized bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3868389376

16/05/01 15:00:50 INFO mapred.JobClient:     Map input records=10

16/05/01 15:00:50 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 15:00:50 INFO mapred.JobClient:     Map output bytes=30

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce shuffle bytes=56

16/05/01 15:00:50 INFO mapred.JobClient:     Physical memory (bytes) snapshot=240697344

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 15:00:50 INFO mapred.JobClient:     Combine output records=0

16/05/01 15:00:50 INFO mapred.JobClient:     Reduce output records=3

16/05/01 15:00:50 INFO mapred.JobClient:     Map output records=10

16/05/01 15:00:50 INFO mapred.JobClient:     Combine input records=0

16/05/01 15:00:50 INFO mapred.JobClient:     CPU time spent (ms)=1490

16/05/01 15:00:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=177016832

16/05/01 15:00:50 INFO mapred.JobClient:   File Input Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Read=101

16/05/01 15:00:50 INFO mapred.JobClient:   FileSystemCounters

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_READ=190

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43049

16/05/01 15:00:50 INFO mapred.JobClient:     FILE_BYTES_READ=56

16/05/01 15:00:50 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9

16/05/01 15:00:50 INFO mapred.JobClient:   Job Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     Launched reduce tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=11002

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13561

16/05/01 15:00:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 15:00:50 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 15:00:50 INFO mapred.JobClient:   File Output Format Counters

16/05/01 15:00:50 INFO mapred.JobClient:     Bytes Written=9

[root@localhost opt]# hadoop fs -cat /aprProduce-output/part-r-00000

Warning: $HADOOP_HOME is deprecated.

 

a1

a2

a3

 

   

2.再用一个mapreduce程序对文件进行切分:

2.1map<k2,v2>为<产品名称,line>

2.2reduce<k3,v3>为<line,null>

2.3自定义分区partition,读取第一个mapreduce程序的输出文件,组装成一个map<产品名称,index>,在partition中判断产品名称并返回下标,没有找到放在0下标中。

2.4设置taskNum(reduce的个数),taskNum应该和partition的个数一致.

3.5使用MultipleOutPuts类进行重命名输出文件,输出文件为 xxx-00001 等

实现:AprClassify类

 

[root@localhost opt]# hadoop jar apr-classify.jar /aprData /apr-output

Warning: $HADOOP_HOME is deprecated.

 

16/05/01 14:09:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

16/05/01 14:09:11 INFO input.FileInputFormat: Total input paths to process : 1

16/05/01 14:09:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library

16/05/01 14:09:11 WARN snappy.LoadSnappy: Snappy native library not loaded

16/05/01 14:09:11 INFO mapred.JobClient: Running job: job_201605010048_0017

16/05/01 14:09:13 INFO mapred.JobClient:  map 0% reduce 0%

16/05/01 14:09:29 INFO mapred.JobClient:  map 100% reduce 0%

16/05/01 14:09:41 INFO mapred.JobClient:  map 100% reduce 33%

16/05/01 14:09:44 INFO mapred.JobClient:  map 100% reduce 66%

16/05/01 14:09:56 INFO mapred.JobClient:  map 100% reduce 100%

16/05/01 14:10:01 INFO mapred.JobClient: Job complete: job_201605010048_0017

16/05/01 14:10:01 INFO mapred.JobClient: Counters: 29

16/05/01 14:10:01 INFO mapred.JobClient:   Map-Reduce Framework

16/05/01 14:10:01 INFO mapred.JobClient:     Spilled Records=20

16/05/01 14:10:01 INFO mapred.JobClient:     Map output materialized bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=7754653696

16/05/01 14:10:01 INFO mapred.JobClient:     Map input records=10

16/05/01 14:10:01 INFO mapred.JobClient:     SPLIT_RAW_BYTES=89

16/05/01 14:10:01 INFO mapred.JobClient:     Map output bytes=131

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce shuffle bytes=169

16/05/01 14:10:01 INFO mapred.JobClient:     Physical memory (bytes) snapshot=387825664

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce input groups=3

16/05/01 14:10:01 INFO mapred.JobClient:     Combine output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Reduce output records=0

16/05/01 14:10:01 INFO mapred.JobClient:     Map output records=10

16/05/01 14:10:01 INFO mapred.JobClient:     Combine input records=0

16/05/01 14:10:01 INFO mapred.JobClient:     CPU time spent (ms)=3950

16/05/01 14:10:01 INFO mapred.JobClient:     Total committed heap usage (bytes)=209522688

16/05/01 14:10:01 INFO mapred.JobClient:   File Input Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Read=101

16/05/01 14:10:01 INFO mapred.JobClient:   FileSystemCounters

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_READ=199

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=86609

16/05/01 14:10:01 INFO mapred.JobClient:     FILE_BYTES_READ=169

16/05/01 14:10:01 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=104

16/05/01 14:10:01 INFO mapred.JobClient:   Job Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Launched map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:     Launched reduce tasks=3

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=35295

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=13681

16/05/01 14:10:01 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

16/05/01 14:10:01 INFO mapred.JobClient:     Data-local map tasks=1

16/05/01 14:10:01 INFO mapred.JobClient:   File Output Format Counters

16/05/01 14:10:01 INFO mapred.JobClient:     Bytes Written=0

[root@localhost opt]# hadoop fs -ls /apr-output/

Warning: $HADOOP_HOME is deprecated.

 

Found 8 items

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/_SUCCESS

drwxr-xr-x   - root supergroup          0 2016-05-01 14:09 /apr-output/_logs

-rw-r--r--   1 root supergroup         51 2016-05-01 14:09 /apr-output/a1-r-00000

-rw-r--r--   1 root supergroup         41 2016-05-01 14:09 /apr-output/a2-r-00001

-rw-r--r--   1 root supergroup         12 2016-05-01 14:09 /apr-output/a3-r-00002

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00000

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00001

-rw-r--r--   1 root supergroup          0 2016-05-01 14:09 /apr-output/part-r-00002

[root@localhost opt]# hadoop fs -cat /apr-output/a1-r-00000

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00000

Warning: $HADOOP_HOME is deprecated.

 

cat: File does not exist: /apr-output/a2-r-00000

[root@localhost opt]# hadoop fs -cat /apr-output/a2-r-00001

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /apr-output/a3-r-00002

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

 

 

3.用hdfs对文件进行批量复制,重命名并转移产品数据文件到指定目录

实现:RenameApr类

 

[root@localhost opt]# hadoop fs -ls /aprProduces

Warning: $HADOOP_HOME is deprecated.

 

Found 3 items

-rw-r--r--   3 yehao supergroup         51 2016-05-01 14:37 /aprProduces/a1

-rw-r--r--   3 yehao supergroup         41 2016-05-01 14:37 /aprProduces/a2

-rw-r--r--   3 yehao supergroup         12 2016-05-01 14:37 /aprProduces/a3

[root@localhost opt]# hadoop fs -cat /aprProduces/a1

Warning: $HADOOP_HOME is deprecated.

 

1       a1      a111

3       a1      a112

4       a1      a112

5       a1      a112

6       a1      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a2

Warning: $HADOOP_HOME is deprecated.

 

2       a2      a211

7       a2      a112

8       a2      a112

9       a2      a112

 

[root@localhost opt]# hadoop fs -cat /aprProduces/a3

Warning: $HADOOP_HOME is deprecated.

 

10      a3      a113

 

代码部分:

1.com.huawei.AprClassify

 

package com;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class AprClassify {
private static int taskNum = HdfsUtils.getMapSize();
public static void main(String[] args)
throws Exception {
Job job = new Job(new Configuration(), AprClassify.class.getSimpleName());
job.setJarByClass(AprClassify.class);
job.setMapperClass(AprClassifyMap.class);
job.setReducerClass(AprClassifyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(AprClassifyPartitioner.class);
job.setNumReduceTasks(taskNum+1);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
class AprClassifyReducer extends Reducer<Text, Text, Text, NullWritable>{
private MultipleOutputs<Text, NullWritable> outputs;
protected void setup(Context context) throws IOException, InterruptedException {
outputs = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
protected void reduce(Text k2, Iterable<Text> v2s,
Reducer<Text, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String st = "";
for (Text text : v2s) {
st += text.toString() +"n";
}
Text k3 = new Text(st);
outputs.write(k3, NullWritable.get(), k2.toString());
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
outputs.close();
}
}
class AprClassifyMap extends Mapper<LongWritable, Text, Text, Text>{
Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("t");
k2.set(splited[1]);
System.out.println(context);
System.out.println(k2);
System.out.println(value);
context.write(k2, value);
}
}
class AprClassifyPartitioner extends Partitioner<Text, Text> {
private static Map<String, Integer> map = HdfsUtils.getMap();
@Override
public int getPartition(Text key, Text value, int numPartitions) {
if(map.get(key.toString()) == null){
return 0;
}
return map.get(key.toString());
}
}

 

 

2.com.huawei.HdfsUtils

package com.huawei;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HdfsUtils {
private static FileSystem fileSystem;
private static Map<String, Integer> map;
private static FileSystem getFileSystem() throws URISyntaxException, IOException {
if(fileSystem == null){
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.1.190:9000");
fileSystem = FileSystem.get(uri, conf);
}
return fileSystem;
}
public static int getMapSize(){
return getMap().size();
}
public static Map<String, Integer> getMap(){
if(map == null){
map = new HashMap<String, Integer>();
FSDataInputStream in;
BufferedReader reader = null;
try{
fileSystem = getFileSystem();
in = fileSystem.open(new Path("hdfs://192.168.1.190:9000/aprProduce"));
reader = new BufferedReader(new InputStreamReader(in));
String line = null;
int i = 1;
while((line = reader.readLine()) != null) {
map.put(line, i++);
}
}catch(Exception e){
e.printStackTrace();
}finally{
try {
if(reader != null) reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return map;
}
public static void copyProduces(String inputPath, String outPutDir)
throws Exception{
FileStatus[] listStatus = getFileSystem().listStatus(new Path(inputPath));
for (FileStatus fileStatus : listStatus) {
String name = fileStatus.getPath().getName();
if(!fileStatus.isDir() && !StringUtils.equals(name, "_SUCCESS") && !StringUtils.startsWith(name, "part-r-")){
FSDataInputStream openStream = fileSystem.open(fileStatus.getPath());
IOUtils.copyBytes(openStream, fileSystem.create(new Path("/"+outPutDir+"/"+name.split("-")[0])), 1024, false);
IOUtils.closeStream(openStream);
}
}
}
}

 

3.com.huawei.AprProduces

package com.huawei;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 分析文件,获得所有产品名
* args[0] 原始文件
* args[1] 输出文件:所有产品名
*
*/
public class AprProduces {
public static void main(String[] args) throws Exception {
Job job = new Job(new Configuration(), AprProduces.class.getSimpleName());
job.setJarByClass(AprProduces.class);
job.setMapperClass(AprProducesMap.class);
job.setReducerClass(AprProducesReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
class AprProducesMap extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("t");
k2.set(splited[1]);//四个文件的 文件名的下标不一样,需要修改
context.write(k2, NullWritable.get());
}
}
class AprProducesReducer extends Reducer<Text, Text, Text, NullWritable>{
@Override
protected void reduce(Text k2, Iterable<Text> v2s,
Reducer<Text, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(k2, NullWritable.get());
}
}

 

4.com.huawei.RenameApr

package com.huawei;
public class RenameApr {
public static void main(String[] args) throws Exception{
//文件重命名
HdfsUtils.copyProduces("/apr-output/", "aprProduce");
}
}

 

最后

以上就是包容洋葱为你收集整理的使用MapReduce对数据文件进行切分的全部内容,希望文章能够帮你解决使用MapReduce对数据文件进行切分所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部