我是靠谱客的博主 俊逸百褶裙,最近开发中收集的这篇文章主要介绍spark操作MySQL,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spark读取MySQL就是通过最简单的jdbc实现读取和写入的,操作起来十分方便。

但是在实际的项目当中,如果你有频繁的操作MySQL的话,建议最好加上连接池,不然速度会很慢。

这里只是spark链接MySQL的入门,没有加上连接池的代码,后续会更新。

MysqlAdapter类如下:

package com.test.mysql;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class MysqlAdapter {
private JdbcProperties jdbcProperties;
protected SparkSession spark;
public MysqlAdapter() {
}
public MysqlAdapter(String evm, JdbcProperties jdbcProperties) {
super(evm);
this.jdbcProperties = jdbcProperties;
spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master(evm)
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.max.dynamic.partitions.pernode", 10000)
.config("hive.exec.max.dynamic.partitions", 100000)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate();
}
private Properties getConnectionProperties() {
Properties connectionProperties = new Properties();
connectionProperties.put("user",jdbcProperties.getUser());
connectionProperties.put("password", jdbcProperties.getPassword());
connectionProperties.put("fetchsize", "5000");
connectionProperties.put("batchsize", "5000");
return connectionProperties;
}
public Dataset<Row> readFromDb(String sql) {
if (StringUtils.isBlank(sql)) {
throw new NullPointerException();
}
return spark.read().jdbc(jdbcProperties.getUrl(), "(" + sql + ") t", getConnectionProperties());
}
public void writeDataset2Db(Dataset<Row> rowDataset, String tableName, SaveMode saveMode) {
if (rowDataset == null || StringUtils.isBlank(tableName)) {
throw new NullPointerException();
}
if (saveMode == null) {
saveMode = SaveMode.Append;
}
rowDataset.repartition(10).write().mode(saveMode).jdbc(jdbcProperties.getUrl(),
tableName, getConnectionProperties());
}
public void writeDataset2Db(Dataset<Row> rowDataset, String tableName) {
writeDataset2Db(rowDataset, tableName, SaveMode.Append);
}
}

JdbcProperties类代码如下: 

package com.test.mysql;
public class JdbcProperties {
private String user;
private String password;
private String url;
private String host;
private String port;
private String dataBase;
public JdbcProperties() {
}
public JdbcProperties(String host, String port, String dataBase, String user, String password) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.dataBase = dataBase;
this.url = String.format("jdbc:mysql://%s:%s/%s?"
+ "zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true",
host, port, dataBase);
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
}

 

最后

以上就是俊逸百褶裙为你收集整理的spark操作MySQL的全部内容,希望文章能够帮你解决spark操作MySQL所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部