我是靠谱客的博主 快乐小懒虫,最近开发中收集的这篇文章主要介绍元数据管理之hive元数据收集,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

我们知道hive的元数据是存储在mysql里面的,Hive元数据基础看这篇文章:Hive 元数据解析。
有几个重要的id:
TBL_ID,SD_ID ,CD_ID其他都是表级别的id,其中TBL_ID代表一个表的id,
SD_ID(SERDE_ID 的缩写),代表一个表存储结构的id(如数据目录、数据格式等)
CD_ID代表列的id,但是一个表有多个列,怎么能是表级,而不是列级的呢?原因是
hive中同一个表中的列都是用同一个CD_ID,而用INTEGER_IDX(表中列的位置)把他们区分开

表元数据:

获取表的基本信息:

#依赖
from sqlalchemy import create_engine
import pymysql
import pandas as pd
sqlEngine = create_engine('mysql+pymysql://dbreader:xx@server01:3306/metastore')
#TABLE相关
b=''' select TBL_ID, CREATE_TIME as TBL_CREATE_TIME, DB_ID,SD_ID,TBL_NAME,TBL_TYPE as MANAGE_TYPE from TBLS '''
with sqlEngine.connect() as conn:
    TBLS =pd.read_sql(sql=b,con=conn)
    
#Database相关
b=''' select DB_ID,NAME as DB_NAME from DBS '''
with sqlEngine.connect() as conn:
    DBS =pd.read_sql(sql=b,con=conn)
    
TBLS=pd.merge(TBLS,DBS,on='DB_ID',how='left')
del TBLS['DB_ID']
print('after meger DBS,TBLS.shape: ',TBLS.shape)

获取表数据存储相关信息:

# 数据存储相关
b=''' select SD_ID, CD_ID, INPUT_FORMAT,LOCATION,NUM_BUCKETS from SDS '''
with sqlEngine.connect() as conn:
    SDS =pd.read_sql(sql=b,con=conn)

TBLS=pd.merge(TBLS,SDS,on='SD_ID',how='left')
print('after meger SDS,TBLS.shape: ',TBLS.shape)

获取表分区相关信息:

# 分区相关
b=''' select TBL_ID,PART_NAME from PARTITIONS '''
with sqlEngine.connect() as conn:
    PARTITIONS =pd.read_sql(sql=b,con=conn)
partition_count=PARTITIONS.groupby('TBL_ID')['PART_NAME'].count()
partition_count=pd.DataFrame({'PARTITION_COUNT':partition_count})
partition_count=partition_count.reset_index()

TBLS=pd.merge(TBLS,partition_count,on='TBL_ID',how='left')
print('after meger partition_count,TBLS.shape: ',TBLS.shape)

# 分区columns相关
b=''' select TBL_ID,PKEY_NAME as PARTITION_NAME  from PARTITION_KEYS '''
with sqlEngine.connect() as conn:
    PARTITION_KEYS =pd.read_sql(sql=b,con=conn)
    
TBLS=pd.merge(TBLS,PARTITION_KEYS,on='TBL_ID',how='left')
print('after meger PARTITION_KEYS,TBLS.shape: ',TBLS.shape)

#获取表的其他信息,该信息基本在TABLE_PARAMS表中,其中表类型(table_type)要解析出来:

#TABLE_PARAMS相关:
b=''' select *  from TABLE_PARAMS '''
with sqlEngine.connect() as conn:
    TABLE_PARAMS =pd.read_sql(sql=b,con=conn)

kudu_table=TABLE_PARAMS[TABLE_PARAMS.PARAM_KEY=='kudu.table_name'].copy()
kudu_table['table_type']='kudu'

hbase_table=TABLE_PARAMS[TABLE_PARAMS.PARAM_KEY=='hbase.table.name'].copy()
hbase_table['table_type']='hbase'

table_type=pd.concat([hbase_table,kudu_table])
table_type=table_type.rename(columns={'PARAM_VALUE':'refer_table'})[['TBL_ID','table_type','refer_table']]
TBLS=pd.merge(TBLS,table_type,on='TBL_ID',how='left')
TBLS.table_type=TBLS.table_type.fillna('hdfs')
print('after meger table_type,TBLS.shape: ',TBLS.shape)
#以上几行代码获取table_type(表的底层数据类型),可以是hdfs,hbase,kudu

param_dict={}
for i in ['transient_lastDdlTime', 'comment',  'totalSize','numFiles', 'numRows', 'rawDataSize' ,'last_modified_time']:
    param_dict[i]=TABLE_PARAMS[TABLE_PARAMS.PARAM_KEY==i].copy()
    param_dict[i]=param_dict[i].rename(columns={'PARAM_VALUE':i})[['TBL_ID',i]]
    TBLS=pd.merge(TBLS,param_dict[i],on='TBL_ID',how='left')
    
print('after meger param_dict,TBLS.shape: ',TBLS.shape)
TBLS.columns=TBLS.columns.map(lambda x:x.lower())
TBLS.tbl_create_time=TBLS.tbl_create_time.map(lambda x:time.strftime('%Y-%m-%d %H:%M',time.localtime(x)))
TBLS.partition_count=TBLS.partition_count.fillna(-1)
TBLS.partition_count=TBLS.partition_count.astype('int')
TBLS.location=TBLS.location.map(lambda x:x.split('8020')[1])
TBLS=TBLS.rename(columns={'tbl_id':'id'})
TBLS['sql_table']=''
TBLS['beizhu']=''
TBLS['owner']=''
#入库
TBLS.to_sql('table_info',con=sqlEngine.connect(),if_exists='replace',index=False)

最终的TBLS就是hive表的源数据,含有表的所有基本重要信息。
其中sql_table(原sql表名),beizhu(备注),owner(责任人)要人工维护

列元数据:

获取列的基本信息:

b=''' select CD_ID as cd_id,COMMENT,COLUMN_NAME,TYPE_NAME,INTEGER_IDX from COLUMNS_V2 '''  ###['CD_ID', 'COMMENT', 'COLUMN_NAME', 'TYPE_NAME', 'INTEGER_IDX']

with sqlEngine.connect() as conn:
    COLUMNS_V2 =pd.read_sql(sql=b,con=conn)
    
#获取 代表分区的列
partiton_colum=TBLS[['cd_id','partition_name' ]].copy()
partiton_colum=partiton_colum.rename(columns={'partition_name':'COLUMN_NAME'})
COLUMNS_V2=pd.concat([COLUMNS_V2,partiton_colum],sort=False)
COLUMNS_V2.INTEGER_IDX=COLUMNS_V2.INTEGER_IDX.fillna(-1)#定分区列的INTEGER_IDX为-1
COLUMNS_V2.INTEGER_IDX=COLUMNS_V2.INTEGER_IDX.astype('int')

关联TBLS表获取列对应信息:

COLUMNS_V2.columns=COLUMNS_V2.columns.map(lambda x:x.lower())
#获取表的一些相关信息:
COLUMNS_V2=pd.merge(COLUMNS_V2,TBLS[['cd_id','tbl_name','db_name','tbl_create_time']],on='cd_id',how='left')

COLUMNS_V2['beizhu']=''
COLUMNS_V2['formula']=''
COLUMNS_V2['owner']=''

#入库
COLUMNS_V2.to_sql('column_info',con=sqlEngine.connect(),if_exists='append',index=False)

最终的COLUMNS_V2就是hive列的源数据,含有列的所有基本重要信息。
其中formula(计算公式),beizhu(备注),owner(责任人)要人工维护

最后

以上就是快乐小懒虫为你收集整理的元数据管理之hive元数据收集的全部内容,希望文章能够帮你解决元数据管理之hive元数据收集所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部