我是靠谱客的博主 秀丽高山,最近开发中收集的这篇文章主要介绍SparkRDD 转存 HIVE,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.*;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class TaskResultSaveAndUpdate {
    public static void main(String[] args) {
        
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("hive");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SparkSession jsc = SparkSession.builder().appName("Test").master("local").enableHiveSupport().getOrCreate();
        HiveContext hiveCtx = new HiveContext(jsc);
        SQLContext sqlCtx = new SQLContext(jsc);
       
			<逻辑略>
        JavaRDD<Row> map = stringTuple2JavaPairRDD.map(f -> {
			<逻辑略>
            return RowFactory.create(user, A1, A2, A3, A4, A5, A6);
        });
        
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField( "user", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A1", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A2", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A3", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A4", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A5", DataTypes.StringType, true ));
        structFields.add(DataTypes.createStructField( "A6", DataTypes.StringType, true ));
        StructType structType = DataTypes.createStructType(structFields);
        Dataset<Row> brandDF = sqlCtx.createDataFrame(map,structType);
        brandDF.registerTempTable("tableTmp");

        hiveCtx.sql("use test");
        hiveCtx.sql("CREATE TABLE IF NOT EXISTS tableHiveTmp (user string,A1 string,A2 string,A3 string,A4 string,A5 string, A6 string)");
        hiveCtx.sql("insert into tableHiveTmp select * from tableTmp");

    }
}

最后

以上就是秀丽高山为你收集整理的SparkRDD 转存 HIVE的全部内容,希望文章能够帮你解决SparkRDD 转存 HIVE所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部