概述
package bi.tag
import java.util.Properties
import bi.utils.{ConfigUtils, KoboldAppUtil}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.LoggerFactory
/*
* Created by ** on 2019-08-07
*/
object TestSparkSqlSource {
Logger.getLogger("org").setLevel(Level.WARN)
val logger = LoggerFactory.getLogger(TestSparkSqlSource.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
val spark = KoboldAppUtil.createSparkContext(TestSparkSqlSource.getClass.getSimpleName)
println("---数据处理开始---")
testHive(spark)
println("---数据处理结束---")
spark.close()
}
def testHive(spark: SparkSession): Unit = {
spark.sql(
"""
|SELECT
|
brand,
|
d.Name
RegionName,
|
c.Name
BranchName,
|
b.WorkNo,
|
t1.EmployeeName,
|
b.EntryDate,
|
e.fillindate lastOfferDate,
|
f.count
historyOfferCount
|FROM
|
(
|
SELECT
|
employeeno,
|
employeename
|
FROM
|
******
|
WHERE
|
FillinDate>='2019-07-15'
|
AND FillinDate<='2019-07-21'
|
AND PhaseId=10
|
AND Role='顾问'
|
AND IsApprove=1
|
AND IsCancel IS NULL
|
GROUP BY
|
employeeno,
|
employeename )t1
|LEFT JOIN
|
*****s b
|ON
|
t1.EmployeeNo=b.Number
|LEFT JOIN
|
**** c
|ON
|
b.BranchNo=c.Number
|LEFT JOIN
|
****** d
|LEFT JOIN
|
(
|
SELECT
|
EmployeeNo,
|
collect_list(FillinDate)[size(collect_list(FillinDate))-1] FillinDate
|
FROM
|
******
|
WHERE
|
FillinDate>='2019-07-15'
|
AND FillinDate<='2019-07-21'
|
AND PhaseId=10
|
AND IsApprove=1
|
AND IsCancel IS NULL
|
GROUP BY
|
EmployeeNo) e
|ON
|
t1.EmployeeNo=e.EmployeeNo
|LEFT JOIN
|
(
|
SELECT
|
EmployeeNo,
|
COUNT(*) COUNT
|
FROM
|
*****
|
WHERE
|
PhaseId=10
|
AND IsApprove=1
|
AND IsCancel IS NULL
|
GROUP BY
|
EmployeeNo) f
|ON
|
t1.EmployeeNo=f.EmployeeNo
|WHERE
|
d.ManageBranchNos LIKE concat(concat("%{",b.branchno),"}%")
|ORDER BY
|
BranchName
|
|
""".stripMargin
).createTempView("t_cal_tmp1")
val prehandleDF = spark.sql(
"""
|select brand,regionname,branchname,workno,employeename,entrydate,historyoffercount from t_cal_tmp1
""".stripMargin
)
//创建临时表
prehandleDF.createTempView("t_cal_tmp2")
//写入hive
Overwrite 覆盖
Append 追加
prehandleDF.write.mode(SaveMode.Overwrite).saveAsTable("tmp.sparksql_hive_test")
//推送到mysql
val prop = new Properties()
prop.setProperty("user", ConfigUtils.mysqlUser)
//mysql的配置ConfigUtils
prop.setProperty("password", ConfigUtils.mysqlPw)
prop.setProperty("driver", ConfigUtils.mysqlDriver)
prehandleDF.write.mode(SaveMode.Append).jdbc(ConfigUtils.mysqlUrl, "sparksql_hive_test", prop)
}
}
注意:配置的mysql账号没有建表权限的话会报错没有建表权限,
除非sparksql配置的mysql账号有权限建表,要不就先建好表再跑任务,同时注意表字段和你sparksql要一致,否则报错:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column "brand" not found in schema Some(StructType(StructField(id,LongType,false), StructField(brand1,StringType,true), StructField(regionname,StringType,true), StructField(branchname,StringType,true), StructField(workno,StringType,true), StructField(employeename,StringType,true), StructField(entrydate,StringType,true), StructField(historyoffercount,StringType,true)));
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:139)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:139)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4.apply(JdbcUtils.scala:138)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4.apply(JdbcUtils.scala:137)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getInsertStatement(JdbcUtils.scala:137)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:813)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:83)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:499)
at bi.tag.TestHive$.testHive(TestHive.scala:132)
at bi.tag.TestHive$.main(TestHive.scala:24)
at bi.tag.TestHive.main(TestHive.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:892)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
最后
以上就是玩命小伙为你收集整理的spark:sparksql:读取文件/读取hive表/写出到hive/写出到mysql的全部内容,希望文章能够帮你解决spark:sparksql:读取文件/读取hive表/写出到hive/写出到mysql所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复