概述
在spark上,用protobuf替代json格式作为数据序列化存储
谷歌的protobuf一般用来将复杂数据结构序列化为二进制数组,非常适合网络传输等领域,其效率和空间占用都优于json格式。
这一次,我在用spark做建模时,打算使用protobuf替换原json格式数据,以获得性能提升。在此记录下实现方式,以及如何避过我遇到的坑。
我的环境是spark1.5.0 + java7 + protobuf2.5。
首先,要编写.proto文件以描述数据结构。这里不详细解释,有兴趣的可参见别人写的:
http://www.cnblogs.com/dkblog/archive/2012/03/27/2419010.html
这里放一个proto文件的例子:
// protobufTest.proto
syntax = "proto2";
option java_package = "com.ismartv.recommendv2.test";
option java_outer_classname = "PersonEntity";//生成的数据访问类的类名
message Person {
required string sn = 1;// sn
required string name = 2;//必须字段,在后面的使用中必须为该段设置值
}
使用命令protoc –java_out=src pathToProto/protobufTest.proto 即可将proto文件所描述的数据类型生成为java类。
接下来只需要编写spark程序,先将Person数据类型由java对象转为protobuf二进制数组输出到hdfs,再由hdfs读取二进制数组数据转换为java对象。完成读写操作。
以protobuf为结构,写java对象到HDFS二进制文件代码:
// 先生成若干个Person对象
JavaRDD<Integer> numbersRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaRDD<Person> persons = numbersRDD.map(new Function<Integer, PersonEntity.Person>() {
public Person call(Integer x) throws Exception {
// TODO Auto-generated method stub
PersonEntity.Person.Builder builder = PersonEntity.Person.newBuilder();
builder.setSn(x.toString());
builder.setName(x.toString() + "name");
PersonEntity.Person person = builder.build();
return person;
}
});
// 将JavaRDD<Person> 转换为JavaPairRDD<NullWritable, BytesWritable>
// 最后保存到HDFS
persons.mapToPair(new PairFunction<PersonEntity.Person, NullWritable, BytesWritable>() {
public Tuple2<NullWritable, BytesWritable> call(Person person) throws Exception {
// 这里new BytesWritable(person.toByteArray()) 是将java对象序列化为protobuf二进制数组
return new Tuple2<NullWritable, BytesWritable>(NullWritable.get(), new BytesWritable(person.toByteArray()));
}
}).saveAsNewAPIHadoopFile("hdfs://nameservice1/test/protobufTest", NullWritable.class, BytesWritable.class, SequenceFileOutputFormat.class);
以protobuf为结构,读HDFS二进制文件到java对象代码:
// 注意要用sequenceFile函数
JavaRDD<Person> readperson = sc.sequenceFile("hdfs://nameservice1/test/protobufTest/", NullWritable.class, BytesWritable.class)
.map(new Function<Tuple2<NullWritable, BytesWritable>, PersonEntity.Person>() {
public Person call(Tuple2<NullWritable, BytesWritable> tuple) throws Exception {
// 解析byte[]为java对象,注意,一定要用copyBytes()而不是getBytes()
PersonEntity.Person p3 = PersonEntity.Person.parseFrom(tuple._2.copyBytes());
return p3;
}
});
// 看一下结果
List<PersonEntity.Person> list = readperson.collect();
for(PersonEntity.Person person : list){
System.out.println(person.toString());
}
最后
以上就是稳重发带为你收集整理的spark程序读写protobuf格式数据(java语言)在spark上,用protobuf替代json格式作为数据序列化存储的全部内容,希望文章能够帮你解决spark程序读写protobuf格式数据(java语言)在spark上,用protobuf替代json格式作为数据序列化存储所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复