概述
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.*;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class TaskResultSaveAndUpdate {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession jsc = SparkSession.builder().appName("Test").master("local").enableHiveSupport().getOrCreate();
HiveContext hiveCtx = new HiveContext(jsc);
SQLContext sqlCtx = new SQLContext(jsc);
<逻辑略>
JavaRDD<Row> map = stringTuple2JavaPairRDD.map(f -> {
<逻辑略>
return RowFactory.create(user, A1, A2, A3, A4, A5, A6);
});
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField( "user", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A1", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A2", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A3", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A4", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A5", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "A6", DataTypes.StringType, true ));
StructType structType = DataTypes.createStructType(structFields);
Dataset<Row> brandDF = sqlCtx.createDataFrame(map,structType);
brandDF.registerTempTable("tableTmp");
hiveCtx.sql("use test");
hiveCtx.sql("CREATE TABLE IF NOT EXISTS tableHiveTmp (user string,A1 string,A2 string,A3 string,A4 string,A5 string, A6 string)");
hiveCtx.sql("insert into tableHiveTmp select * from tableTmp");
}
}
最后
以上就是秀丽高山为你收集整理的SparkRDD 转存 HIVE的全部内容,希望文章能够帮你解决SparkRDD 转存 HIVE所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复