概述
spark读取MySQL就是通过最简单的jdbc实现读取和写入的,操作起来十分方便。
但是在实际的项目当中,如果你有频繁的操作MySQL的话,建议最好加上连接池,不然速度会很慢。
这里只是spark链接MySQL的入门,没有加上连接池的代码,后续会更新。
MysqlAdapter类如下:
package com.test.mysql;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class MysqlAdapter {
private JdbcProperties jdbcProperties;
protected SparkSession spark;
public MysqlAdapter() {
}
public MysqlAdapter(String evm, JdbcProperties jdbcProperties) {
super(evm);
this.jdbcProperties = jdbcProperties;
spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master(evm)
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.max.dynamic.partitions.pernode", 10000)
.config("hive.exec.max.dynamic.partitions", 100000)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate();
}
private Properties getConnectionProperties() {
Properties connectionProperties = new Properties();
connectionProperties.put("user",jdbcProperties.getUser());
connectionProperties.put("password", jdbcProperties.getPassword());
connectionProperties.put("fetchsize", "5000");
connectionProperties.put("batchsize", "5000");
return connectionProperties;
}
public Dataset<Row> readFromDb(String sql) {
if (StringUtils.isBlank(sql)) {
throw new NullPointerException();
}
return spark.read().jdbc(jdbcProperties.getUrl(), "(" + sql + ") t", getConnectionProperties());
}
public void writeDataset2Db(Dataset<Row> rowDataset, String tableName, SaveMode saveMode) {
if (rowDataset == null || StringUtils.isBlank(tableName)) {
throw new NullPointerException();
}
if (saveMode == null) {
saveMode = SaveMode.Append;
}
rowDataset.repartition(10).write().mode(saveMode).jdbc(jdbcProperties.getUrl(),
tableName, getConnectionProperties());
}
public void writeDataset2Db(Dataset<Row> rowDataset, String tableName) {
writeDataset2Db(rowDataset, tableName, SaveMode.Append);
}
}
JdbcProperties类代码如下:
package com.test.mysql;
public class JdbcProperties {
private String user;
private String password;
private String url;
private String host;
private String port;
private String dataBase;
public JdbcProperties() {
}
public JdbcProperties(String host, String port, String dataBase, String user, String password) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.dataBase = dataBase;
this.url = String.format("jdbc:mysql://%s:%s/%s?"
+ "zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true",
host, port, dataBase);
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
}
最后
以上就是俊逸百褶裙为你收集整理的spark操作MySQL的全部内容,希望文章能够帮你解决spark操作MySQL所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复