Hive优化、hive数仓设计
hive优化:
一、需要调优的几个方面
1.HIVE语句执行不了
2.HIVE查询语句,在集群中执行时,数据无法落地
HIVE执行时,一开始语句检查没有问题,生成了多个JOB,
并且一开JOB中的Map 及 Reduce 正常运行,之后便报异常包括 OOM 异常等
3.HIVE查询语句,执行时,Map或者Reduce端数据处理异常慢,导致整个执行效率低
二、调优方式:
1.分区、分桶
(1)分区的好处,在扫描表时,会根据查询语句中的过滤条件,将固定分区中的数据加载至内存中避免了表的全表扫描。
(2)分桶好处? 在获取数据时,根据查询的数据,进行做hash操作,将需要获取的数据指定到具体的桶中,这样只获取固定部分桶数据,减小了数据的加载量
2.使用外部表
外部表和普通表的区别? 删除数据时,外部表不会将HDFS中对应表路径中的数据删除
3.选择适当的文件压缩格式
(1)对于刚采集过的源数据,需要用TextFile格式进行保存,需要保证源数据的格式及内容和原先一致
(2)对于处理过的数据,一般对数据进行压缩保存(需要考虑实际情况)
4.命名要规范
创建表时,需要遵守:
如果数据存储在dwd中那么建表时需要将 dwd 放至 表的开端
同时后面的业务名称需要和库名用 _ 进行分隔
5.数据分层,表分离,但是也不要分的太散
(1)数据分层:
将不同类型的数据,应当存储在不同库中,
比如 维度表 应当存储在 维度库 、原始数据应当存储在ODS库中专门做管理
(2)表分离:
在实际业务过程中,有一些表的维度比较大,单个表的存储压力大
同时数据读取时,拉去的数据内容比较多,但是所需要的字段较少,浪费计算资源
可以将表中相同类型的信息切分至多个表中,根据实际业务需要进行读取数据
如果分的太散,那么也会造成数据冗余,并且加载表过多,计算慢
6.分区裁剪 where过滤,先过滤,后join
(1)针对分区表数据,可以通过where条件进行过滤数据,之后再进行其他操作
(2)适当的使用一些子查询,将子查询中的数据进行初步过滤,然后再与其他表数据进行关联
7.mapjoin(1.2以后自动默认启动mapjoin)
(1)MapJoin顾名思义,就是在Map阶段进行表之间的连接,map阶段直接拿另外一个表的数据和内存中表数据做匹配。而不需要进入到Reduce阶段才进行连接。这样就节省了在Shuffle阶段时要进行的大量数据传输。从而起到了优化作业的作用。通常用于一个很小的表和一个大表进行join的场景。
(2)MaP JOIN 相关设置:
1)设置自动选择Mapjoin
set hive.auto.convert.join = true; 默认为true
2)大表小表的阈值设置(默认25M以下认为是小表):
set hive.mapjoin.smalltable.filesize = 25000000;
8.分区分桶,合并小文件
(1)为什么小文件需要合并?
小文件过多,MR处理数据时,会产生多个MapTask,然而每个MapTask处理的数据量很少,
那么导致MapTask启动时间大于执行时间,整体任务时间消耗较大
(2)如何合并小文件:
1)在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2)在Map-Reduce的任务结束时合并小文件的设置:
2.1 在map-only任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;
-- 如果有一个HIVE表,小文件过多,可以通过select * from 表 只产生一个Map
-- 任务将原表中的小文件合并并输出至新表
2.2 在map-reduce任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;
合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;
当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 128000000;
3)--限制Map,Reduce数
set mapreduce.tasktracker.map.tasks.maximum=30; --每个nodemanager节点上可运行的最大map任务数,默认值2,可根据实际值调整为10~100;
set mapreduce.tasktracker.reduce.tasks.maximum=30; --每个nodemanager节点上可运行的最大reduce任务数,默认值2,可根据实际值调整为10~100;
4)--将多个小文件打包作为一个整体的inputsplit,减少map任务数
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set mapreduce.input.fileinputformat.split.maxsize=128000000; --切片大小最大值,不设置,则所有输入只启动一个map任务
set mapreduce.input.fileinputformat.split.minsize.per.node=16000000; --同一节点的数据块形成切片时,切片大小的最小值
5)
-- 最大split大小
set mapred.max.split.size=128000000;
9.排序优化
(1)order by
全局排序操作, 只有一个Reduce任务去对数据进行排序,会造成全部的数据堆积在一个Reduce任务中进行处理
一个Reduce任务的内存是有限的,承载不了太多数据
(2)distribute by + sort by
distribute by:是指我们的分区操作
sort by: 跟上distribute by 之后可以实现在分区内进行排序,
如果当前的Reduce数量为1,那么也没有优化的效果,可以通过设置reduce的数量对不同分区中的数据进行拆分排序
通过 set mapreduce.job.reduces = N; 设置Reduce数量,默认参数值为-1 即根据资源及查询语句情况进行分配reduce,
(3)cluster by语句:
如果分区字段和排序字段是同一个字段,那么和 distribute by + sort by 效果一致
10.数据倾斜优化
(1)原因:
1.Key分布不均,导致某一些Key在做Reduce端处理时,执行较慢
2.数据重复,在关联时,会产生笛卡尔积,致使数据膨胀,严重的可能会导致集群挂掉
(2)表现:
任务进度长时间维持在99%(或100%),
查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大
(3)如果想展示数据倾斜效果:
1.保证两张表的数据相差不大
2.可以关闭MapJOIN优化,防止执行MapJOIN流程
--查看具体的偏差的结果可以去 Reduce Tasks for job 历史中查看
-- 可以得到结论:数据倾斜之后 部分Reduce的执行时间明显比其他Reduce任务要长
(4)解决方法:
1.对会产生笛卡尔积的数据进行初步过滤
SELECT
T1.id
,count(*) as num
FROM learn4.student_null T1 JOIN learn4.score_null T2 ON T1.id = T2.id AND (T1.id != "null_student" or T2.id != "null_student")
GROUP BY T1.id
2.可以对Key比较集中的数据进行加随机数,然后再进行处理
SELECT
T1.id
,count(*) as num
FROM learn4.student_null T1 JOIN learn4.score_null T2 ON concat(T1.id,floor((rand()*10)%5)) = concat(T2.id,floor((rand()*10)%5))
GROUP BY T1.id
SELECT
substring(T.id,1,-1) as id
,sum(T.num) --对打散的ID统计过的结果进行汇总
FROM (
SELECT
T1.id
,count(*) as num --对打散的ID进行统计
FROM (
SELECT
concat(T1.id,floor((rand()*10)%5)) as id --将T1表中的数据进行打散
FROM learn4.student_null) T1
JOIN
(
SELECT
concat(T1.id,floor((rand()*10)%5)) as id --将T2表中的数据进行打散
FROM learn4.score_null) T2 ON T1.id =T2.id
)T GROUP BY substring(T.id,1,-1)
3.通过参数进行调整
设置开启Map端的聚合操作
set hive.map.aggr = true
设置groupby的检查点条数
set hive.groupby.mapaggr.checkinterval = 100000
开启GROUP BY 的负载均衡操作
set hive.groupby.skewindata = true
通过调整该参数,使得有倾斜的数据在Map reduce过程中 被随机的从Map端发送到Reduce端进行处理,
reduce会对随机发送过来的数据进行 初步的处理,之后再进行后续的统计
hive数仓设计:
一、数据分层
作为一名数据的规划者,我们肯定希望自己的数据能够有秩序地流转,数据的整个生命周期能够清晰明 确被设计者和使用者感知到。直观来就是如下的左图这般层次清晰、依赖关系直观。 但是,大多数情况下,我们完成的数据体系却是依赖复杂、层级混乱的。如下的右图,在不知不觉的情 况下,我们可能会做出一套表依赖结构混乱,甚至出现循环依赖的数据体系。 因此,我们需要一套行之有效的数据组织和管理方法来让我们的数据体系更有序,这就是谈到的数据分层。数据分层并不能解决所有的数据问题,但是,数据分层却可以给我们带来如下的好处:
清晰数据结构:每一个数据分层都有它的作用域和职责,在使用表的时候能更方便地定位和理解
减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算
统一数据口径:通过数据分层,提供统一的数据出口,统一对外输出的数据口径
复杂问题简单化:将一个复杂的任务分解成多个步骤来完成,每一层解决特定的问题
二、分层设计
自下而上
1、ODS(Operational Data Store):数据运营层
“面向主题的”数据运营层,也叫ODS层,是最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就说传说中的 ETL 之后,装入本层。本层的数据,总体上大多是按照源头业务系统 的分类方式而分类的。 一般来讲,为了考虑后续可能需要追溯数据问题,因此对于这一层就不建议做过多的数据清洗工作,原封不动地接入原始数据即可,至于数据的去噪、去重、异常值处理等过程可以放在后面的DWD层来做。
2、DW(Data Warehouse):数据仓库层
数据仓库层是我们在做数据仓库时要核心设计的一层,在这里,从 ODS 层中获得的数据按照主题 建立各种数据模型。DW层又细分为 DWD(Data Warehouse Detail)层、DWM(Data WareHouse Middle)层和DWS(Data Warehouse Service)层。
(1)DWD(Data Warehouse Detail):数据明细层
该层一般保持和ODS层一样的数据粒度,并且提供一定的数据质量保证。同时,为了提高数据明细 层的易用性,该层会采用一些维度退化手法,将维度退化至事实表中,减少事实表和维表的关联。另外,在该层也会做一部分的数据聚合,将相同主题的数据汇集到一张表中,提高数据的可用性。
(2)DWM(Data Warehouse Middle):数据中间层
该层会在DWD层的数据基础上,对数据做轻度的聚合操作,生成一系列的中间表,提升公共指标的复用性,减少重复加工。直观来讲,就是对通用的核心维度进行聚合操作,算出相应的统计指标。
(3)DWS(Data Warehouse Service):数据服务层
又称数据集市或宽表。按照业务划分,如流量、订单、用户等,生成字段比较多的宽表,用于提供 后续的业务查询,OLAP分析,数据分发等。
一般来讲,该层的数据表会相对比较少,一张表会涵盖比较多的业务内容,由于其字段较多,因此 一般也会称该层的表为宽表。在实际计算中,如果直接从DWD或者ODS计算出宽表的统计指标,会存在 计算量太大并且维度太少的问题,因此一般的做法是,在DWM层先计算出多个小的中间表,然后再拼接 成一张DWS的宽表。由于宽和窄的界限不易界定,也可以去掉DWM这一层,只留DWS层,将所有的数据在放在DWS亦可。
3、ADS/APP/DM(Application Data Store/Application/DataMarket):数据应用层/数据集市
在这里,主要是提供给数据产品和数据分析使用的数据,一般会存放在 ES、PostgreSql、Redis等 系统中供线上系统使用,也可能会存在 Hive 或者 Druid 中供数据分析和数据挖掘使用。比如我们经常 说的报表数据,一般就放在这里。
4、DIM(Dimension):维表层,贯穿整个数仓设计
维表层主要包含两部分数据:
高基数维度数据:一般是用户资料表、商品资料表类似的资料表。数据量可能是千万级或者上亿级别。
低基数维度数据:一般是配置表,比如枚举值对应的中文含义,或者日期维表。数据量可能是个位数或者几千几万。