概述
文章目录
- 一、mybatis连接hive和MySQL测试
- 1、导入依赖
- 2、创建druid连接池
- 3、创建数据源
- 4、创建实体类
- 5、创建dao
- 6、mybatis-config.xml
- 7、mapper/hive-events.xml
- 8、mapper/mysql-events.xml
- 9、测试类
- 二、mybatis代替sqoop的功能,把数据从Hive导入MySQL
- 1-4、前四步与上例相同
- 5、创建dao
- 5.1、创建MySQLEventsDao
- 5.2、创建HiveEventsDao
- 6、第6步同案例1
- 7、mapper/hive-events.xml
- 8、mapper/mysql-events.xml
- 9、创建Hive导入MySQL的方法
- 10、测试类
一、mybatis连接hive和MySQL测试
- 使用druid连接池,导包druid、mybatis、hivejdbc、mysql
- 创建druid连接池:自定义druid数据连接工厂,实现druid的接口
- 新建类读取配置文件,获取SQLSession
- 创建实体类
- 分别创建hive和MySQL的数据访问对象dao接口
- 创建mybatis配置文件,把Hive和MySQL写在一起,用不容的environmentID,DataSource都是用Druid连接池
- 针对不同的environment创建对应的mapper文件
- 创建SQLSession,连接mapper,即可调用方法进行查询
1、导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
2、创建druid连接池
package cn.kgc.druidtest;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.ibatis.datasource.DataSourceFactory;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Properties;
public class DruidDataSourceFctory implements DataSourceFactory {
private Properties prop;
@Override
public void setProperties(Properties properties) {
this.prop=properties;
}
@Override
public DataSource getDataSource() {
DruidDataSource druid = new DruidDataSource();
druid.setDriverClassName(this.prop.getProperty("driver"));
druid.setUrl(this.prop.getProperty("url"));
druid.setUsername(this.prop.getProperty("username"));
druid.setPassword(this.prop.getProperty("password"));
// //连接池的最大数据库连接数。设为0表示无限制
// druid.setMaxActive(Integer.parseInt(this.prop.getProperty("maxactive")));
// //初始化连接:连接池启动时创建的初始化连接数量
// druid.setInitialSize(Integer.parseInt(this.prop.getProperty("initialsize")));
try {
druid.init();
} catch (SQLException e) {
e.printStackTrace();
}
return druid;
}
}
3、创建数据源
package cn.kgc.druidtest;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
public class DatabaseUtils {
private static final String configPath = "mybatis-config.xml";
//用连接池大批量insert数据
//多数据源切换:读哪个环境,可以当做参数传进来,读hive还是mysql
public static SqlSession getSession(String db) {
SqlSession session = null;
try {
InputStream is = Resources.getResourceAsStream(configPath);
SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is,db.equals("mysql")?"xym":"xym2");
session = factory.openSession();
} catch (IOException e) {
e.printStackTrace();
}
return session;
}
public static void closeSession(SqlSession session){
session.close();
}
}
4、创建实体类
package cn.kgc.entry;
public class Events {
private String eventid;
private String userid;
private String starttime;
private String city;
private String state;
private String zip;
private String country;
private String lat;
private String lng;
private String features;
public Events(String eventid, String userid, String starttime, String city, String state, String zip, String country, String lat, String lng, String features) {
this.eventid = eventid;
this.userid = userid;
this.starttime = starttime;
this.city = city;
this.state = state;
this.zip = zip;
this.country = country;
this.lat = lat;
this.lng = lng;
this.features = features;
}
public Events() {
}
public String getEventid() {
return eventid;
}
public void setEventid(String eventid) {
this.eventid = eventid;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public String getStarttime() {
return starttime;
}
public void setStarttime(String starttime) {
this.starttime = starttime;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getZip() {
return zip;
}
public void setZip(String zip) {
this.zip = zip;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getLat() {
return lat;
}
public void setLat(String lat) {
this.lat = lat;
}
public String getLng() {
return lng;
}
public void setLng(String lng) {
this.lng = lng;
}
public String getFeatures() {
return features;
}
public void setFeatures(String features) {
this.features = features;
}
}
5、创建dao
package cn.kgc.dao;
import cn.kgc.entry.Events;
import java.util.List;
import java.util.Map;
public interface HiveEventsDao {
List<Events> findAll();
}
package cn.kgc.dao;
import cn.kgc.entry.Events;
import java.util.List;
public interface MySQLEventsDao {
List<Events> findAll();
}
6、mybatis-config.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC
"-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<typeAliases>
<typeAlias type="cn.kgc.druidtest.DruidDataSourceFctory" alias="DRUID"></typeAlias>
<typeAlias type="cn.kgc.entry.Events" alias="event"></typeAlias>
</typeAliases>
<environments default="xym">
<environment id="xym">
<transactionManager type="JDBC"></transactionManager>
<dataSource type="DRUID">
<property name="driver" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://single:3306/ms_dm_intes"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</dataSource>
</environment>
<environment id="xym2">
<transactionManager type="JDBC"></transactionManager>
<dataSource type="DRUID">
<property name="driver" value="org.apache.hive.jdbc.HiveDriver"/>
<property name="url" value="jdbc:hive2://single:10000/dwd_intes"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</dataSource>
</environment>
</environments>
<mappers>
<mapper resource="mapper/mysql-events.xml"></mapper>
<mapper resource="mapper/hive-events.xml"></mapper>
</mappers>
</configuration>
7、mapper/hive-events.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
"-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.HiveEventsDao">
<!-- 读Hive,返回的是map格式-->
<select id="findAll" resultType="event">
select eventid,userid,starttime,city,state,zip,country,lat,lng,features from dwd_intes.dwd_events limit 3
</select>
</mapper>
8、mapper/mysql-events.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
"-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.MySQLEventsDao">
<!-- 读Hive,返回的是map格式-->
<select id="findAll" resultType="event">
select * from dim_events limit 3
</select>
</mapper>
9、测试类
package cn.kgc;
import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import org.apache.ibatis.session.SqlSession;
public class App {
public static void main(String[] args) {
SqlSession session_mysql = DatabaseUtils.getSession("mysql");
MySQLEventsDao edao_mysql = session_mysql.getMapper(MySQLEventsDao.class);
System.out.println(edao_mysql.findAll());
SqlSession session_hive = DatabaseUtils.getSession("hive");
HiveEventsDao edao_hive = session_hive.getMapper(HiveEventsDao.class);
System.out.println(edao_hive.findAll());
}
}
二、mybatis代替sqoop的功能,把数据从Hive导入MySQL
1-4、前四步与上例相同
5、创建dao
5.1、创建MySQLEventsDao
考虑到要实现sqoop数据迁移把数据插入MySQL中的功能,我们采用批量插入手动commit的方式,在MySQLEventsDao接口中添加批量插入的方法void batchInsert(List<Events> eves);
,代码如下:
package cn.kgc.dao;
import cn.kgc.entry.Events;
import java.util.List;
public interface MySQLEventsDao {
List<Events> findAll();
void batchInsert(List<Events> eves);
}
5.2、创建HiveEventsDao
考虑到Hive中,批量读取数据加载至内存再分批写入时,一般的机器内存会不够用,数据量过大提不出来,并且insert into table values后面拼接的数据超过4M也会报错,因此采取分页的方式分批次读取hive,把分页的页码传给方法即可读取适量的数据
package cn.kgc.dao;
import cn.kgc.entry.Events;
import java.util.List;
public interface HiveEventsDao {
List<Events> findAll(int page);
}
但是Hive不支持limit分页,因此采用窗口函数row_number的形式,并且在序号上除以每页的数据量然后取整,即可得到页码
create table dwd_intes.tmp_dwd_events as
select b.*,floor(rn/50000) flag
from (
select *,row_number() over() rn
from dwd_intes.dwd_events
)b;
还有溢出需要注意的是,mybatis里读hive数据时不能用select *的方式,否则加载的数据信息无法变更类型为实体类类型
6、第6步同案例1
7、mapper/hive-events.xml
读hive的方法,此时可以传入页码参数,代码如下:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
"-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.HiveEventsDao">
<!-- 读Hive,返回的是map格式-->
<select id="findAll" resultType="event" parameterType="int">
select eventid,userid,starttime,city,state,zip,country,lat,lng,features
from dwd_intes.tmp_dwd_events
where flag=#{flag}
</select>
</mapper>
8、mapper/mysql-events.xml
创建insert方法,使用foreach方法,指定传入集合为list,元素为eve,分隔符为逗号,该方法可以批量拼接待插入的数据
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
"-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.MySQLEventsDao">
<!-- 读Hive,返回的是map格式-->
<select id="findAll" resultType="event">
select * from dim_events_bak
</select>
<insert id="batchInsert" parameterType="java.util.List">
insert into dim_events_bak1 values
<foreach collection="list" item="eve" separator=",">
(#{eve.eventid},#{eve.userid},#{eve.starttime},#{eve.city},#{eve.state},#{eve.zip},#{eve.country},#{eve.lat},#{eve.lng},#{eve.features})
</foreach>
</insert>
</mapper>
9、创建Hive导入MySQL的方法
循环读取hive每页的数据,对于每读到一页,MySQL这边分10次提交,这样便可以较为快速的插入数据
package cn.kgc.service;
import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import cn.kgc.entry.Events;
import org.apache.ibatis.session.SqlSession;
import java.util.ArrayList;
import java.util.List;
public class HiveToMySqlService {
static HiveEventsDao hiveEdao;
static {
//通过HiveEventDAO查找Hive中的数据
SqlSession hiveSession = DatabaseUtils.getSession("hive");
hiveEdao = hiveSession.getMapper(HiveEventsDao.class);
}
public void fillMySql(){
//准备mysql数据库连接
SqlSession session = DatabaseUtils.getSession("mysql");
MySQLEventsDao medao = session.getMapper(MySQLEventsDao.class);
for (int flag = 0; flag < 63; flag++) {
System.out.println(flag);
List<Events> eves = hiveEdao.findAll(flag);
List<Events> tmpdata = new ArrayList<>();
for (int i = 1; i <= eves.size(); i++) {
tmpdata.add(eves.get(i-1));
if (i%5000==0||i==eves.size()){
medao.batchInsert(tmpdata);
session.commit();
tmpdata.clear();
}
}
}
}
}
10、测试类
package cn.kgc;
import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import cn.kgc.service.HiveToMySqlService;
import org.apache.ibatis.session.SqlSession;
public class App {
public static void main(String[] args) {
//批量把hive数据转为对象传入MySQL
HiveToMySqlService htm = new HiveToMySqlService();
htm.fillMySql();
}
}
大工告成!
最后
以上就是狂野小蚂蚁为你收集整理的mybatis连接Hive和MySQL双源操作一、mybatis连接hive和MySQL测试二、mybatis代替sqoop的功能,把数据从Hive导入MySQL的全部内容,希望文章能够帮你解决mybatis连接Hive和MySQL双源操作一、mybatis连接hive和MySQL测试二、mybatis代替sqoop的功能,把数据从Hive导入MySQL所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复