我是靠谱客的博主 狂野小蚂蚁,最近开发中收集的这篇文章主要介绍mybatis连接Hive和MySQL双源操作一、mybatis连接hive和MySQL测试二、mybatis代替sqoop的功能,把数据从Hive导入MySQL,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 一、mybatis连接hive和MySQL测试
    • 1、导入依赖
    • 2、创建druid连接池
    • 3、创建数据源
    • 4、创建实体类
    • 5、创建dao
    • 6、mybatis-config.xml
    • 7、mapper/hive-events.xml
    • 8、mapper/mysql-events.xml
    • 9、测试类
  • 二、mybatis代替sqoop的功能,把数据从Hive导入MySQL
    • 1-4、前四步与上例相同
    • 5、创建dao
      • 5.1、创建MySQLEventsDao
      • 5.2、创建HiveEventsDao
    • 6、第6步同案例1
    • 7、mapper/hive-events.xml
    • 8、mapper/mysql-events.xml
    • 9、创建Hive导入MySQL的方法
    • 10、测试类

一、mybatis连接hive和MySQL测试

  • 使用druid连接池,导包druid、mybatis、hivejdbc、mysql
  • 创建druid连接池:自定义druid数据连接工厂,实现druid的接口
  • 新建类读取配置文件,获取SQLSession
  • 创建实体类
  • 分别创建hive和MySQL的数据访问对象dao接口
  • 创建mybatis配置文件,把Hive和MySQL写在一起,用不容的environmentID,DataSource都是用Druid连接池
  • 针对不同的environment创建对应的mapper文件
  • 创建SQLSession,连接mapper,即可调用方法进行查询

1、导入依赖

    <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis</artifactId>
      <version>3.4.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.1.10</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.38</version>

2、创建druid连接池

package cn.kgc.druidtest;

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.ibatis.datasource.DataSourceFactory;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Properties;

public class DruidDataSourceFctory implements DataSourceFactory {
    private Properties prop;

    @Override
    public void setProperties(Properties properties) {
        this.prop=properties;
    }

    @Override
    public DataSource getDataSource() {
        DruidDataSource druid = new DruidDataSource();
        druid.setDriverClassName(this.prop.getProperty("driver"));
        druid.setUrl(this.prop.getProperty("url"));
        druid.setUsername(this.prop.getProperty("username"));
        druid.setPassword(this.prop.getProperty("password"));
//        //连接池的最大数据库连接数。设为0表示无限制
//        druid.setMaxActive(Integer.parseInt(this.prop.getProperty("maxactive")));
//        //初始化连接:连接池启动时创建的初始化连接数量
//        druid.setInitialSize(Integer.parseInt(this.prop.getProperty("initialsize")));
        try {
            druid.init();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return druid;
    }
}

3、创建数据源

package cn.kgc.druidtest;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

import java.io.IOException;
import java.io.InputStream;

public class DatabaseUtils {
    private static final String configPath = "mybatis-config.xml";
    //用连接池大批量insert数据

    //多数据源切换:读哪个环境,可以当做参数传进来,读hive还是mysql
    public static SqlSession getSession(String db) {
        SqlSession session = null;
        try {
            InputStream is = Resources.getResourceAsStream(configPath);
            SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is,db.equals("mysql")?"xym":"xym2");
            session = factory.openSession();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return session;
    }

    public static void closeSession(SqlSession session){
        session.close();
    }
}

4、创建实体类

package cn.kgc.entry;

public class Events {
    private String eventid;
    private String userid;
    private String starttime;
    private String city;
    private String state;
    private String zip;
    private String country;
    private String lat;
    private String lng;
    private String features;

    public Events(String eventid, String userid, String starttime, String city, String state, String zip, String country, String lat, String lng, String features) {
        this.eventid = eventid;
        this.userid = userid;
        this.starttime = starttime;
        this.city = city;
        this.state = state;
        this.zip = zip;
        this.country = country;
        this.lat = lat;
        this.lng = lng;
        this.features = features;
    }

    public Events() {
    }

    public String getEventid() {
        return eventid;
    }

    public void setEventid(String eventid) {
        this.eventid = eventid;
    }

    public String getUserid() {
        return userid;
    }

    public void setUserid(String userid) {
        this.userid = userid;
    }

    public String getStarttime() {
        return starttime;
    }

    public void setStarttime(String starttime) {
        this.starttime = starttime;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public String getZip() {
        return zip;
    }

    public void setZip(String zip) {
        this.zip = zip;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public String getLat() {
        return lat;
    }

    public void setLat(String lat) {
        this.lat = lat;
    }

    public String getLng() {
        return lng;
    }

    public void setLng(String lng) {
        this.lng = lng;
    }

    public String getFeatures() {
        return features;
    }

    public void setFeatures(String features) {
        this.features = features;
    }
}

5、创建dao

package cn.kgc.dao;

import cn.kgc.entry.Events;

import java.util.List;
import java.util.Map;

public interface HiveEventsDao {
    List<Events> findAll();
}
package cn.kgc.dao;

import cn.kgc.entry.Events;

import java.util.List;

public interface MySQLEventsDao {
    List<Events> findAll();
}

6、mybatis-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC
        "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">

<configuration>
    <typeAliases>
        <typeAlias type="cn.kgc.druidtest.DruidDataSourceFctory" alias="DRUID"></typeAlias>
        <typeAlias type="cn.kgc.entry.Events" alias="event"></typeAlias>
    </typeAliases>
    <environments default="xym">
        <environment id="xym">
            <transactionManager type="JDBC"></transactionManager>
            <dataSource type="DRUID">
                <property name="driver" value="com.mysql.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://single:3306/ms_dm_intes"/>
                <property name="username" value="root"/>
                <property name="password" value="root"/>
            </dataSource>
        </environment>
        <environment id="xym2">
            <transactionManager type="JDBC"></transactionManager>
            <dataSource type="DRUID">
                <property name="driver" value="org.apache.hive.jdbc.HiveDriver"/>
                <property name="url" value="jdbc:hive2://single:10000/dwd_intes"/>
                <property name="username" value="root"/>
                <property name="password" value="root"/>
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource="mapper/mysql-events.xml"></mapper>
        <mapper resource="mapper/hive-events.xml"></mapper>
    </mappers>
</configuration>

7、mapper/hive-events.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
        "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.HiveEventsDao">
<!--    读Hive,返回的是map格式-->
    <select id="findAll" resultType="event">
        select eventid,userid,starttime,city,state,zip,country,lat,lng,features from dwd_intes.dwd_events limit 3
    </select>
</mapper>

8、mapper/mysql-events.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
        "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.MySQLEventsDao">
<!--    读Hive,返回的是map格式-->
    <select id="findAll" resultType="event">
        select * from dim_events limit 3
    </select>
</mapper>

9、测试类

package cn.kgc;

import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import org.apache.ibatis.session.SqlSession;

public class App {
    public static void main(String[] args) {
        SqlSession session_mysql = DatabaseUtils.getSession("mysql");
        MySQLEventsDao edao_mysql = session_mysql.getMapper(MySQLEventsDao.class);
        System.out.println(edao_mysql.findAll());

        SqlSession session_hive = DatabaseUtils.getSession("hive");
        HiveEventsDao edao_hive = session_hive.getMapper(HiveEventsDao.class);
        System.out.println(edao_hive.findAll());
    }
}

image-20210421113507427

二、mybatis代替sqoop的功能,把数据从Hive导入MySQL

1-4、前四步与上例相同

5、创建dao

5.1、创建MySQLEventsDao

考虑到要实现sqoop数据迁移把数据插入MySQL中的功能,我们采用批量插入手动commit的方式,在MySQLEventsDao接口中添加批量插入的方法void batchInsert(List<Events> eves);,代码如下:

package cn.kgc.dao;

import cn.kgc.entry.Events;

import java.util.List;

public interface MySQLEventsDao {
    List<Events> findAll();

    void batchInsert(List<Events> eves);
}

5.2、创建HiveEventsDao

考虑到Hive中,批量读取数据加载至内存再分批写入时,一般的机器内存会不够用,数据量过大提不出来,并且insert into table values后面拼接的数据超过4M也会报错,因此采取分页的方式分批次读取hive,把分页的页码传给方法即可读取适量的数据

package cn.kgc.dao;

import cn.kgc.entry.Events;

import java.util.List;

public interface HiveEventsDao {
    List<Events> findAll(int page);
}

但是Hive不支持limit分页,因此采用窗口函数row_number的形式,并且在序号上除以每页的数据量然后取整,即可得到页码

create table dwd_intes.tmp_dwd_events as
select b.*,floor(rn/50000) flag 
from (
    select *,row_number() over() rn 
    from dwd_intes.dwd_events
)b;

还有溢出需要注意的是,mybatis里读hive数据时不能用select *的方式,否则加载的数据信息无法变更类型为实体类类型

6、第6步同案例1

7、mapper/hive-events.xml

读hive的方法,此时可以传入页码参数,代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
        "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.HiveEventsDao">
<!--    读Hive,返回的是map格式-->
    <select id="findAll" resultType="event" parameterType="int">
        select eventid,userid,starttime,city,state,zip,country,lat,lng,features
        from dwd_intes.tmp_dwd_events
        where flag=#{flag}
    </select>
</mapper>

8、mapper/mysql-events.xml

创建insert方法,使用foreach方法,指定传入集合为list,元素为eve,分隔符为逗号,该方法可以批量拼接待插入的数据

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC
        "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.kgc.dao.MySQLEventsDao">
<!--    读Hive,返回的是map格式-->
    <select id="findAll" resultType="event">
        select * from dim_events_bak
    </select>

    <insert id="batchInsert" parameterType="java.util.List">
        insert into dim_events_bak1 values
        <foreach collection="list" item="eve" separator=",">
            (#{eve.eventid},#{eve.userid},#{eve.starttime},#{eve.city},#{eve.state},#{eve.zip},#{eve.country},#{eve.lat},#{eve.lng},#{eve.features})
        </foreach>
    </insert>
</mapper>

9、创建Hive导入MySQL的方法

循环读取hive每页的数据,对于每读到一页,MySQL这边分10次提交,这样便可以较为快速的插入数据

package cn.kgc.service;

import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import cn.kgc.entry.Events;
import org.apache.ibatis.session.SqlSession;

import java.util.ArrayList;
import java.util.List;

public class HiveToMySqlService {
    static HiveEventsDao hiveEdao;
    static {
        //通过HiveEventDAO查找Hive中的数据
        SqlSession hiveSession = DatabaseUtils.getSession("hive");
        hiveEdao = hiveSession.getMapper(HiveEventsDao.class);
    }

    public void fillMySql(){
        //准备mysql数据库连接
        SqlSession session = DatabaseUtils.getSession("mysql");
        MySQLEventsDao medao = session.getMapper(MySQLEventsDao.class);

        for (int flag = 0; flag < 63; flag++) {
            System.out.println(flag);
            List<Events> eves = hiveEdao.findAll(flag);
            List<Events> tmpdata = new ArrayList<>();
            for (int i = 1; i <= eves.size(); i++) {
                tmpdata.add(eves.get(i-1));
                if (i%5000==0||i==eves.size()){
                    medao.batchInsert(tmpdata);
                    session.commit();
                    tmpdata.clear();
                }
            }
        }
    }
}

10、测试类

package cn.kgc;

import cn.kgc.dao.HiveEventsDao;
import cn.kgc.dao.MySQLEventsDao;
import cn.kgc.druidtest.DatabaseUtils;
import cn.kgc.service.HiveToMySqlService;
import org.apache.ibatis.session.SqlSession;

public class App {
    public static void main(String[] args) {
        //批量把hive数据转为对象传入MySQL
        HiveToMySqlService htm = new HiveToMySqlService();
        htm.fillMySql();
    }
}

大工告成!

最后

以上就是狂野小蚂蚁为你收集整理的mybatis连接Hive和MySQL双源操作一、mybatis连接hive和MySQL测试二、mybatis代替sqoop的功能,把数据从Hive导入MySQL的全部内容,希望文章能够帮你解决mybatis连接Hive和MySQL双源操作一、mybatis连接hive和MySQL测试二、mybatis代替sqoop的功能,把数据从Hive导入MySQL所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部