我是靠谱客的博主 坦率钢笔,最近开发中收集的这篇文章主要介绍GreenPlum轻量级MPP架构数仓-数据流程调度GreenPlum轻量级MPP架构数仓-数据流程调度,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
GreenPlum轻量级MPP架构数仓-数据流程调度
- DataX
- GreenPlum
- Hera-Scheduler/airflow(最好不要将SQL嵌入到python脚本中,还是使用psql命令执行SQL代码,便于维护)
- Mysql数据源、Kafka数据源
1 DataX数据采集
1.1 数据从业务库MySQL到数仓GP
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://${source_db_host}:${source_db_port}/${source_db}?characterEncoding=utf8"],
"querySql": ["SELECT FROM table_name "]
}
],
"username": "${source_db_user}",
"password": "${source_db_pwd}",
"where": ""
}
},
"writer": {
"name": "gpdbwriter",
"parameter": {
"postSql": [],
"preSql": ["TRUNCATE TABLE ods.table_name ", "VACUUM ods.table_name"],
"column": ["c1", "c2", "column3name"....],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://${target_db_host}:${target_db_port}/${target_db}",
"table": ["ods.table_name"]
}
],
"username": "${target_db_user}",
"password": "${target_db_pwd}",
"num_copy_writer": 2,
"num_copy_processor": 2,
"copy_queue_size": 5000,
"segment_reject_limit": 0
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
1.2 数据从kafka到数仓GP
{
"job": {
"content": [
{
"reader": {
"name": "kafkareader",
"parameter": {
"topic": "${topic}",
"bootstrapServers": "${bootstrap_servers}",
"partitions": ${partitions},
"groupId": "${group_id}",
"parsingRules": "${parsing_rules}",
"timeout": ${timeout},
"retries": ${retries},
"offsetReset": "${offset_reset}",
"kafkaRecordKeys": "col1,col2,col3,col4.....",
"logPath":"/opt/module/datax/log/errorlog"
}
},
"writer": {
"name": "gpdbwriter",
"parameter": {
"postSql": [],
"preSql": ["DELETE FROM ods.table_name WHERE TO_CHAR(TO_TIMESTAMP(TO_NUMBER(COALESCE(send_time,'0'),'9999999999')):: TIMESTAMP WITHOUT TIME ZONE,'yyyy-mm-dd') < '${del_start_date}' "],
"column": ["col1", "col2", "col3", "col4"],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://${target_db_host}:${target_db_port}/${target_db}",
"table": ["ods.table_name"]
}
],
"username": "${target_db_user}",
"password": "${target_db_pwd}",
"segment_reject_limit": 0,
"copy_queue_size": 5000,
"num_copy_processor": 4,
"num_copy_writer": 4
}
}
}
],
"setting": {
"speed": {
"channel": 3
}
}
}
}
1.3 DataX脚本执行
#其中-p 是手动或者通过调度系统传入参数到json脚本,然后连接源进行reader和writer的执行
#常用格式是 -p "-Dparam1=xxxx -Dparam2=xxxx ....-DparamN=${paramN}"
datax.py -p "-Dsource_db_host=mysql_host -Dsource_db_port=3306 -Dsource_db=mysql_database_name -Dsource_db_user=mysql_user_name -Dsource_db_pwd=mysql_pwd -Dtarget_db_host=gp_host -Dtarget_db_port=5432 -Dtarget_db=gp_database -Dtarget_db_user=gp_user_name -Dtarget_db_pwd=gp_pwd" /home/gpadmin/zeus-jobs/extract-data/mysql-gp/course/xxxxx.json;
2 ETL过程,以ODS-DW为例
2.1 ETL-SQL脚本
set ON_ERROR_STOP ON
DELETE FROM schema_name.table_name WHERE partition_key=:p_value;
INSERT INTO schema_name.table_name
SELECT
:p_value AS partition_key,
.......
FROM(
SELECT
....
FROM table_name
......
) tmp
2.2 GP非交互式命令执行SQL脚本
# ${day}可以从调度系统传过来,也可以手动给定,p_value命名在SQL中以:p_value来接收,如上
# -d :database_name -f path:/sql_file -v 给参数赋值
psql -d x_train -f /home/gpadmin/zeus-jobs/clean-data/ods-dw/course/script_table.sql -v p_value="${day}";
3 airflow调度脚本样式
#!/usr/bin/python
# -*- coding: utf-8 -*-
import datetime
import argparse
import psycopg2
'''
作者:shufang416
日期:2020-09-23
主题:取自模型A,路径为.../path/....xsln
扼要:
说明:该表为
頻率:日 (值只可取:日,周,月,季,年)
模式:全量
是否对外输出:是
输出表名:table_name
上线日期与版本:
变更日期:
变更人:
变更内容:
建表语句
---------------------表分隔符------------------------------
------------------table_name--------------53列
DROP TABLE IF EXISTS schema_name.table_name;
CREATE TABLE IF NOT EXISTS schema_name.table_name(
--there is columns...
)WITH(appendoptimized=TRUE,orientation=COLUMN)
DISTRIBUTED randomly;
--为新建的表给不同的开发账号配置读写等权限
ALTER TABLE schema_name.table_name OWNER TO user_name1;
COMMENT ON TABLE schema_name.table_name IS 'dnfkdh';
COMMENT ON COLUMN schema_name.table_name IS 'kfbkewb';
'''
#获取调度的执行参数
parser = argparse.ArgumentParser(description="etl of argparse")
parser.add_argument('--host', required=True, help = 'mpp host')
parser.add_argument('--port', required=True, help = 'mpp port')
parser.add_argument('--user', required=True, help = 'mpp user')
parser.add_argument('--password', required=True, help = 'mpp password')
parser.add_argument('--database', required=True, help = 'mpp database')
parser.add_argument('--etl_dt', required=True, help = 'mpp etl_dt')
args = parser.parse_args()
connect_args = {'host' : args.host,
'port' : args.port,
'user' : args.user,
'password' : args.password,
'database' : args.database}
#创建数据库连接,并执行,多段sql通过";"进行拆分
def etl_process():
with psycopg2.connect(**connect_args) as conn:
with conn.cursor() as cursor:
print('------------------连接数据库成功 {}-------------------------'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
sqls = sql.split(';')
for i,j in zip(sqls,range(len(sqls))):
try:
print('------------------开始执行第{0}段,总共{1}段 {2}------------------n'.format(j+1,len(sqls),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
print(i)
cursor.execute(i)
print('------------------执行成功 {}--------------------------------nn'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
except Exception as e:
print('------------------错误信息 {0}------------------n{1}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),e))
raise
break
#只需修改sql部分,sql结束不需要带";"
if __name__ == '__main__':
sql = '''
--这里为ETL的SQL代码
TRUNCATE TABLE schema_name.table_name;
INSERT INTO schema_name.table_name
SELECT
....
FROM
....
'''
etl_process()
4 然后在调度平台建立作业,上传脚本,建立工作流以及任务血缘关系依赖
最后
以上就是坦率钢笔为你收集整理的GreenPlum轻量级MPP架构数仓-数据流程调度GreenPlum轻量级MPP架构数仓-数据流程调度的全部内容,希望文章能够帮你解决GreenPlum轻量级MPP架构数仓-数据流程调度GreenPlum轻量级MPP架构数仓-数据流程调度所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复