我是靠谱客的博主 满意鸡,最近开发中收集的这篇文章主要介绍hive优化,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.当hive执行join内存溢出时,可以修改hive的配置文件hive-site.xml,增大内存,如下: mapred.child.java.opts -Xmx 1024m

2.hive默认建表时的路径也可以在hive-site.xml里配置,如下: hive.metastore.warehouse.dir value >/user/hive/warehouse description >location of default database for the warehouse

3.执行join操作的时候,尽量把小表放前面,大表放前面可能会因为内存溢出而出错

4.对分区表进行操作需要对分区进行过滤(如:ds=$yday)。 特别是在JOIN操作的时候,分区过滤(如:ds=$yday)需要放到 ON语句 或子查询 里面。不能放到ON后面的WHERE里,这样会扫描所有表,最后才判断分区。也就是说程序会先执行JOIN操作,才会执行最后的WHERE操作。

5.在JOIN操作中,后面被连续JOIN且同一字段,只会执行一个mapreduce操作。 SELECT * FROM a LEFT OUTER JOIN b ON a.t=b.t LEFT OUTER JOIN c ON a.t=c.t; 推荐的 SELECT * FROM a LEFT OUTER JOIN b ON a.t=b.t LEFT OUTER JOIN c ON b.t=c.t; 效率低下的

6.当一个大表和一个很小的表进行JOIN操作的时候,使用MAPJOIN操作,这样会把小表读入内存进行JOIN,只需要一个map操作JOIN就完成了 select /*+ mapjoin(a)*/ a.c1,b.c2,b.c3 from a join b on a.c4=b.c4;

7.通过设置hive.merge.mapfiles可以关闭hive对于扫描表的优化,但有时候会提高效率。默认值为true。可以视情况设置:只含有SELECT的语句 或 MAPJOIN 推荐使用

8.ALTER TABLE a SET SERDEPROPERTIES('serialization.null.format' = ''); 可以使结果表不出现N字符串,而用空串代替

列裁剪(Column Pruning)

在读数据的时候,只读取查询中需要用到的列,而忽略其他列。例如,对于查询:

SELECT a,b FROM T WHERE e < 10;

其中,T 包含 5 个列 (a,b,c,d,e),列 c,d 将会被忽略,只会读取a, b, e 列

这个选项默认为真: hive.optimize.cp = true

分区裁剪(Partition Pruning)

在查询的过程中减少不必要的分区。例如,对于下列查询:

SELECT * FROM (SELECT c1, COUNT(1)
FROM T GROUP BY c1) subq
WHERE subq.prtn = 100;
SELECT * FROM T1 JOIN
(SELECT * FROM T2) subq ON (T1.c1=subq.c2)
WHERE subq.prtn = 100;

会在子查询中就考虑 subq.prtn = 100 条件,从而减少读入的分区数目。

此选项默认为真:hive.optimize.pruner=true

Join

在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。

对于一条语句中有多个 Join 的情况,如果 Join 的条件相同,比如查询:

INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
  • 如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce
  • 一个 Map-Reduce 任务,而不是 ‘n’ 个
  • 在做 OUTER JOIN 的时候也是一样

如果 Join 的条件不相同,比如:

INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);

Map-Reduce 的任务数目和 Join 操作的数目是对应的,上述查询和以下查询是等价的:

INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);

Map Join

Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。比如查询:

INSERT OVERWRITE TABLE pv_users
SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);

可以在 Map 阶段完成 Join,如图所示:

相关的参数为:

  • hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
  • hive.mapjoin.size.key = 10000
  • hive.mapjoin.cache.numrows = 10000

Group By

  • Map 端部分聚合:
    • 并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。
    • 基于 Hash
    • 参数包括:
      • hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
      • hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目
  • 有数据倾斜的时候进行负载均衡
    • hive.groupby.skewindata = false
    • 当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

合并小文件

文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的结果文件来消除这样的影响:

  • hive.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True
  • hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False
  • hive.merge.size.per.task = 256*1000*1000 合并文件的大小

hive相关优化方式:

Column Pruning

As name suggests -discard columns which are not needed

> select a,b from t where e<10;

>t contains 5 columns (a,b,c,d,e)

Columns c,d are discarded

Select only the relevant columns

Enabled by defaut

  > hive.optimize.cp=true


Predicate Pushdown

Move predicate closer to the table scan only.

Enabled by default:

hive.optimize.ppd=true

Predicates moved up across joins.

select * from t1 join t2 on(t1.c1=t2.c2 and t1.c1<10)

select * from t1 join t2 on(t1.c1=t2.c2) where t1.c1<10

Special needs for outer joins:

Left outer join: predicates on the left side aliases are pushed

Right outer join:predicates on the right side aliases are pushed

Full outer join:none of the predicates are pushed

Non-deterministic functions(eg.rand()) not pushed.

Use annotation:

> #UDFType(deterministic=false)

The entire expression containing non-deterministic function is not pushed up

> c1>10 and c2<rand()


Partition Pruning

Reduce list of partitions to be scanned

Works on parse tree currently- some known bugs

select * from 

     (select c1, count(1) from t group by c1) subq

where subq.prtn=100;

select * from t1 join

     (select * from t2) subq on(t1.c1=subq.c2)

where subq.prtn=100;

hive.mapred.mode=nonstrict

Strict mode, scan of a complete partitioned table fails.


Hive QL-Join

INSERT OVERWRITE TABLE pv_users

SELECT pv.pageid,u.age

FROM page_view pv

JOIN user u

ON (pv.userid=u.userid);

Rightmost table streamed - whereas inner tables data is 

kept in memory for a given key. Use largest table as the 

right most table.

hive.mapred.mode=nonstrict

In strict mode,Cartesian product not allowed


INSERT OVERWRITE TABLE pv_users

SELECT pv.pageid,u.age

FROM page_view p JOIN user u

ON (pv.userid=u.useid)

JOIN new user x on (u.userid=x.useid);

Same join key - merge into 1 map-reduce job -true for any number of tables with the same join key.

1 map-reduce job instead of 'n'

The merging happens for OUTER joins also


INSERT OVERWRITE TABLE pv_users

SELECT pv.pageid,u.age

FROM page_view p JOIN user u

    ON (pv.userid=u.userid)

     JOIN new user x on (u.age=x.age);

Different join keys - 2 map-reduce jobs

Same as:

INSERT OVERWRITE TABLE tmptable SELECT *

FROM page_view p JOIN user u

  ON(pv.useid=u.userid);

INSERT OVERWRITE TABLE pv_users

SELECT x.pageid,x.age

FROM temptable x JOIN newuser y on(x.age=y.age);


Join Optimizations

Map Joins

 > User specified small tables stored in hash tables on the mapper backed by jdbm

 > No reducer needed

INSERT INTO TABLE pv_users

SELECT /*+MAPJOIN(pv) */ pv.pageid,u.age 

FROM page_view pv JOIN user u

ON(pv.userid=u.userid);


Map Join

>>Optimization phase

>> n-way map-join if(n-1) tables are map side readable

>>Mapper reads all (n-1) table before processing the main table under consideration

>>Map-side readable tables are cached in memory and backed by JDBM persistent hash tables

Parameters

>>hive.join.emit.interval=1000

>>hive.mapjoin.size.key=10000

>>hive.mapjoin.cache.numrows=10000


Future

>>Sizes/statistics to determine join order

>>Sizes to enforce map-join

>>Better techniques for handling skew for a given key

>>Using sorted properties of the table

>>Fragmented joins

>>n-way joins for different join keys by replicating data


Hive QL - Group By

SELECT pageid, age, count(1) 

FROM pv_users

GROUP BY paged,age;


Group by Optimizations

>>Map side partial aggregations

     > Hash-based aggregates

     > Serialized key/values in hash tables

     > 90% speed improvement on Query 

         select count(1) from t;

>> Load balancing for data skew

Parameters

>> hive.map.aggr=true

>> hive.groupby.skewindata=false

>> hive.groupby.mapaggr.checkinterval=100000

>> hive.map.aggr.hash.percentmemory=0.5

>> hive.map.aggr.hash.min.reduction=0.5


Multi GroupBy

FROM pv_users

        INSERT OVERWRITE TABLE pv_gender_sum

             SELECT gender, count(DISTINCT userid),count(userid)

                   GROUP BY gender

        INSERT OVERWRITE TABLE pv_age_sum

              SELECT age, count(DISTINCT userid)

                   GROUP BY age

>>n+1 map-reduce jobs instead of 2n

>> Single scan of input table

>>Same distinct key across all groupies

>>Always user multi-groupby


Merging of small files

>>Lots of small files creates problems for downstream jobs

     > SELECT * FROM T WHERE x<10;

>> hive.merge.mapfiles=true

>> hive.merge.mapredfiles=false

>> hive.merge.size.per.task=256*1000*1000

>> Increases time for current query

Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具。

使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,

所以需要去掉原有关系型数据库下开发的一些固有思维。

 

基本原则:

1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段

select ... from A

join B

on A.key = B.key

where A.userid>10

     and B.userid<10

        and A.dt='20120417'

        and B.dt='20120417';

应该改写为:

select .... from (select .... from A

                  where dt='201200417'

                                    and userid>10

                              ) a

join ( select .... from B

       where dt='201200417'

                     and userid < 10   

     ) b

on a.key = b.key;

2:尽量原子化操作,尽量避免一个SQL包含复杂逻辑

可以使用中间表来完成复杂的逻辑

drop table if exists tmp_table_1;

create table if not exists tmp_table_1 as

select ......;

 

drop table if exists tmp_table_2;

create table if not exists tmp_table_2 as

select ......;

 

drop table if exists result_table;

create table if not exists result_table as

select ......;

 

drop table if exists tmp_table_1;

drop table if exists tmp_table_2;

 

 

3:单个SQL所起的JOB个数尽量控制在5个以下

 

4:慎重使用mapjoin,一般行数小于2000行,大小小于1M(扩容后可以适当放大)的表才能使用,小表要注意放在join的左边(目前TCL里面很多都小表放在join的右边)。

否则会引起磁盘和内存的大量消耗

 

5:写SQL要先了解数据本身的特点,如果有join ,group操作的话,要注意是否会有数据倾斜

如果出现数据倾斜,应当做如下处理:

set hive.exec.reducers.max=200;

set mapred.reduce.tasks= 200;---增大Reduce个数

set hive.groupby.mapaggr.checkinterval=100000 ;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.groupby.skewindata=true; --如果是group by过程出现倾斜 应该设置为true

set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

set hive.optimize.skewjoin=true;--如果是join 过程出现倾斜应该设置为true

 

6:如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insert into 语句,实际测试过程中,执行时间能提升50%

insert overwite table tablename partition (dt= ....)

select ..... from (

                   select ... from A

                   union all

                   select ... from B

                   union all

                   select ... from C

                               ) R

where ...;

 

可以改写为:

insert into table tablename partition (dt= ....)

select .... from A

WHERE ...;

 

insert into table tablename partition (dt= ....)

select .... from B

WHERE ...;

 

insert into table tablename partition (dt= ....)

select .... from C

WHERE ...; 

一、join优化

Join查找操作的基本原则:应该将条目少的表/子查询放在 Join 操作符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出错误的几率。

Join查找操作中如果存在多个join,且所有参与join的表中其参与join的key都相同,则会将所有的join合并到一个mapred程序中。

案例:

SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)  在一个mapre程序中执行join

SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)   在两个mapred程序中执行join

Map join的关键在于join操作中的某个表的数据量很小,案例:

SELECT /*+ MAPJOIN(b) */ a.key, a.value

FROM a join b on a.key = b.key

Mapjoin 的限制是无法执行a FULL/RIGHT OUTER JOIN b,和map join相关的hive参数:hive.join.emit.interval  hive.mapjoin.size.key  hive.mapjoin.cache.numrows

由于join操作是在where操作之前执行,所以当你在执行join时,where条件并不能起到减少join数据的作用;案例:

SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)

WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'

最好修改为:

SELECT a.val, b.val FROM a LEFT OUTER JOIN b

ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')

在join操作的每一个mapred程序中,hive都会把出现在join语句中相对靠后的表的数据stream化,相对靠前的变的数据缓存在内存中。当然,也可以手动指定stream化的表:SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

二、group by 优化

Map端聚合,首先在map端进行初步聚合,最后在reduce端得出最终结果,相关参数:

· hive.map.aggr = true是否在 Map 端进行聚合,默认为 True

· hive.groupby.mapaggr.checkinterval = 100000在 Map 端进行聚合操作的条目数目

数据倾斜聚合优化,设置参数hive.groupby.skewindata = true,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

三、合并小文件

文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的结果文件来消除这样的影响:

· hive.merge.mapfiles = true是否和并 Map 输出文件,默认为 True

· hive.merge.mapredfiles = false是否合并 Reduce 输出文件,默认为 False

· hive.merge.size.per.task = 256*1000*1000合并文件的大小

四、Hive实现(not) in

通过left outer join进行查询,(假设B表中包含另外的一个字段 key1

select a.key from a left outer join b on a.key=b.key where b.key1 is null

通过left semi join 实现 in

SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)

Left semi join 的限制:join条件中右边的表只能出现在join条件中。

五、排序优化

Order by 实现全局排序,一个reduce实现,效率低

Sort by 实现部分有序,单个reduce输出的结果是有序的,效率高,通常和DISTRIBUTE BY关键字一起使用(DISTRIBUTE BY关键字 可以指定map 到 reduce端的分发key)

CLUSTER BY col1 等价于DISTRIBUTE BY col1 SORT BY col1

六、使用分区

Hive中的每个分区都对应hdfs上的一个目录,分区列也不是表中的一个实际的字段,而是一个或者多个伪列,在表的数据文件中实际上并不保存分区列的信息与数据。Partition关键字中排在前面的为主分区(只有一个),后面的为副分区

静态分区:静态分区在加载数据和使用时都需要在sql语句中指定

案例:(stat_date='20120625',province='hunan')

动态分区:使用动态分区需要设置hive.exec.dynamic.partition参数值为true,默认值为false,在默认情况下,hive会假设主分区时静态分区,副分区使用动态分区;如果想都使用动态分区,需要设置set hive.exec.dynamic.partition.mode=nostrick,默认为strick

案例:(stat_date='20120625',province)

七、Distinct 使用

Hive支持在group by时对同一列进行多次distinct操作,却不支持在同一个语句中对多个列进行distinct操作。

八、Hql使用自定义的mapred脚本

注意事项:在使用自定义的mapred脚本时,关键字MAP REDUCE 是语句SELECT TRANSFORM ( ... )的语法转换,并不意味着使用MAP关键字时会强制产生一个新的map过程,使用REDUCE关键字时会产生一个red过程。

自定义的mapred脚本可以是hql语句完成更为复杂的功能,但是性能比hql语句差了一些,应该尽量避免使用,如有可能,使用UDTF函数来替换自定义的mapred脚本

九、UDTF

UDTF将单一输入行转化为多个输出行,并且在使用UDTF时,select语句中不能包含其他的列,UDTF不支持嵌套,也不支持group by 、sort by等语句。如果想避免上述限制,需要使用lateral view语法,案例:

select a.timestamp, get_json_object(a.appevents, '$.eventid'), get_json_object(a.appenvets, '$.eventname') from log a;

select a.timestamp, b.*

from log a lateral view json_tuple(a.appevent, 'eventid', 'eventname') b as f1, f2;

其中,get_json_object为UDF函数,json_tuple为UDTF函数。

UDTF函数在某些应用场景下可以大大提高hql语句的性能,如需要多次解析json或者xml数据的应用场景。

十、聚合函数count和sum

Count和sum函数可能是在hql语句中使用的最为频繁的两个聚合函数了,但是在hive中count函数在计算distinct value时支持加入条件过滤。

 

最后

以上就是满意鸡为你收集整理的hive优化的全部内容,希望文章能够帮你解决hive优化所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部