我是靠谱客的博主 拉长白羊,最近开发中收集的这篇文章主要介绍hive 两个没有null指定的表左关联的结果有null_Hive企业级调优表的优化小表、大表Join大表Join大表MapJoin(小表join大表)Group ByCount(Distinct) 去重统计笛卡尔积行列过滤动态分区调整,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

小表、大表Join

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。
案例实操

需求

测试大表JOIN小表和小表JOIN大表的效率

建大表、小表和JOIN后表的语句

// 创建大表
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';
// 创建小表
create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';
// 创建join后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';

分别向大表和小表中导入数据

hive (default)> load data local inpath '/opt/module/datas/bigtable' into table bigtable;
hive (default)>load data local inpath '/opt/module/datas/smalltable' into table smalltable;

关闭mapjoin功能(默认是打开的)

set hive.auto.convert.join = false;

执行小表JOIN大表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable b
on b.id = s.id;
Time taken: 35.921 seconds
No rows affected (44.456 seconds)

执行大表JOIN小表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
left join smalltable s
on s.id = b.id;
Time taken: 34.196 seconds
No rows affected (26.287 seconds)

大表Join大表

空KEY过滤

有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空,操作如下:
案例实操

配置历史服务器

配置mapred-site.xml
<property>
<name>mapreduce.jobhistory.addressname>
<value>hadoop102:10020value>
property>
<property>
<name>mapreduce.jobhistory.webapp.addressname>
<value>hadoop102:19888value>
property>

启动历史服务器

sbin/mr-jobhistory-daemon.sh start historyserver

查看jobhistory http://hadoop102:19888/jobhistory

创建原始数据表、空id表、合并后数据表

// 创建原始表
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';
// 创建空id表
create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';
// 创建join后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';

分别加载原始数据和空id数据到对应表中

hive (default)> load data local inpath '/opt/module/datas/ori' into table ori;
hive (default)> load data local inpath '/opt/module/datas/nullid' into table nullidtable;

测试不过滤空id

hive (default)> insert overwrite table jointable select n.* from nullidtable n left join ori o on n.id = o.id;
Time taken: 42.038 seconds
Time taken: 37.284 seconds

测试过滤空id

hive (default)> insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
Time taken: 31.725 seconds
Time taken: 28.876 seconds

空key转换

有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。例如:
案例实操:
不随机分布空null值:

设置5个reduce个数

set mapreduce.job.reduces = 5;

JOIN两张表

insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id;

可以看出来,出现了数据倾斜,某些reducer的资源消耗远大于其他reducer。5ac156842fc3e3085a0bdc9ea4c56a06.png

随机分布空null值

设置5个reduce个数

set mapreduce.job.reduces = 5;

JOIN两张表

insert overwrite table jointable
select n.* from nullidtable n full join ori o on
case when n.id is null then concat('hive', rand()) else n.id end = o.id;

可以看出来,消除了数据倾斜,负载均衡reducer的资源消耗416b37d16003f5aca3ae7182cd34cf91.png

MapJoin(小表join大表)

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

开启MapJoin参数设置

设置自动选择Mapjoin

set hive.auto.convert.join = true; 默认为true

大表小表的阈值设置(默认25M一下认为是小表):

set hive.mapjoin.smalltable.filesize=25000000;

MapJoin工作机制528b0716e7b9f4ec6bb16fafab625d9f.png

  1. Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。

  2. Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

  3. 由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
    案例实操:
    1)开启Mapjoin功能

set hive.auto.convert.join = true; 默认为true

2)执行小表JOIN大表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable b
on s.id = b.id;
Time taken: 24.594 seconds

3)执行大表JOIN小表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
join smalltable s
on s.id = b.id;
Time taken: 24.315 seconds

Group By

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。692122072e0ef0d63b451a50ccc9edc7.png
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

开启Map端聚合参数设置

  1. 是否在Map端进行聚合,默认为True

set hive.map.aggr = true
  1. 在Map端进行聚合操作的条目数目

set hive.groupby.mapaggr.checkinterval = 100000
  1. 有数据倾斜的时候进行负载均衡(默认是false)

set hive.groupby.skewindata = true

当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作

hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 23.68 sec HDFS Read: 19987 HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 680 msec
OK
deptno
10
20
30
优化以后
hive (default)> set hive.groupby.skewindata = true;
hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 28.53 sec HDFS Read: 18209 HDFS Write: 534 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 5 Cumulative CPU: 38.32 sec HDFS Read: 15014 HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 1 minutes 6 seconds 850 msec
OK
deptno
10
20
30

Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT的全聚合操作,即使设定了reduce task个数,set mapred.reduce.tasks=100;hive也只会启动一个reducer。,这就造成一个Reduce处理的数据量太大,导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:
案例实操

  1. 创建一张大表

hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword
string, url_rank int, click_num int, click_url string) row format delimited
fields terminated by 't';
  1. 加载数据

hive (default)> load data local inpath '/opt/module/datas/bigtable' into table
bigtable;
  1. 设置5个reduce个数

set mapreduce.job.reduces = 5;
  1. 执行去重id查询

hive (default)> select count(distinct id) from bigtable;
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.12 sec HDFS Read: 120741990 HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 120 msec
OK
c0
100001
Time taken: 23.607 seconds, Fetched: 1 row(s)
  1. 采用GROUP by去重id

hive (default)> select count(id) from (select id from bigtable group by id) a;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 17.53 sec HDFS Read: 120752703 HDFS Write: 580 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 9409 HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 21 seconds 820 msec
OK
_c0
100001
Time taken: 50.795 seconds, Fetched: 1 row(s)

虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的。

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积

行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤,比如:
案例实操:

  1. 测试先关联两张表,再用where条件过滤

hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 34.406 seconds, Fetched: 100 row(s)
  1. 通过子查询后,再关联表

hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 30.058 seconds, Fetched: 100 row(s)

动态分区调整

关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。

开启动态分区参数设置

  1. 开启动态分区功能(默认true,开启)

hive.exec.dynamic.partition=true
  1. 设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)

hive.exec.dynamic.partition.mode=nonstrict
  1. 在所有执行MR的节点上,最大一共可以创建多少个动态分区。默认1000

hive.exec.max.dynamic.partitions=1000
  1. 在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。

hive.exec.max.dynamic.partitions.pernode=100
  1. 整个MR Job中,最大可以创建多少个HDFS文件。默认100000

hive.exec.max.created.files=100000
  1. 当有空分区生成时,是否抛出异常。一般不需要设置。默认false

hive.error.on.empty.partition=false

案例实操

需求:将dept表中的数据按照地区(loc字段),插入到目标表dept_partition的相应分区中。

  1. 创建目标分区表

hive (default)> create table dept_partition(id int, name string) partitioned
by (location int) row format delimited fields terminated by 't';
  1. 设置动态分区

set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)> insert into table dept_partition partition(location) select deptno, dname, loc from dept;
  1. 查看目标分区表的分区情况

hive (default)> show partitions dept_partition;

思考:目标分区表是如何匹配到分区字段的?

a81bb3ac0b8140cb73db9629ed83d932.png
简书:https://www.jianshu.com/u/0278602aea1d
CSDN:https://blog.csdn.net/u012387141

最后

以上就是拉长白羊为你收集整理的hive 两个没有null指定的表左关联的结果有null_Hive企业级调优表的优化小表、大表Join大表Join大表MapJoin(小表join大表)Group ByCount(Distinct) 去重统计笛卡尔积行列过滤动态分区调整的全部内容,希望文章能够帮你解决hive 两个没有null指定的表左关联的结果有null_Hive企业级调优表的优化小表、大表Join大表Join大表MapJoin(小表join大表)Group ByCount(Distinct) 去重统计笛卡尔积行列过滤动态分区调整所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部