概述
餐饮开发组有一个需求,要将餐饮生产数据库的订单表和订单详情表的数据,做数据整合,数据抽取,整合以后抽取到BI 数据库,供BI项目开发做BI分析。
要求:
<1> BI项目组可以查询到的数据时段必须与生产订单库的时间最大间隔两个小时,也就是说 BI 项目组早晨11点必须要查询到早晨9点之前的所有数据, 中午13点必须要查询到中午11点之前的数据,依次类推。
<2> BI库存储的数据包含所有的历史数据,非当月订单数据不要求与主订单库是完全一致同步的,但是当月的数据必须每天同步到BI库与主库的订单数据保持一致的。
<3> 通过BI SQL 多表关联集和运算以后,可能会出现业务上的“重复性数据”,之前一直设计的是用select distinct,但是这样数据量很大的情况下,尤其在类似月结的时候,跑批 当月数据会特别慢,需要优化整合,保证同步到BI库的订单数据在业务上是无重复的。
我的设计思路:
1)抽取到BI库端的数据事实表设计成分区表CYERP_BASE,按月分区,BI程序就去查询这个表的数据。
2) 考虑到要 求<1>中的准实时性,如果直接在 CYERP_BASE 表上Truncate 最后一个月的分区,然后再insert CYERP_BASE select from xxxx ,那么一定在拉数据过程中,BI 项目组对这个表中当月数据的使用时不可以用的,这种方案肯定不行。那么我就利用普通表与分区表 可以做分区exchange的特性,由于非当月的历史数据 不用管,那么创建一张表结构与BI库分区表一样 的普通表CYERP_BASE_EXG,然后,先将数据拉去到CYERP_BASE_EXG中,拉入完成以后,我再将CYERP_BASE_EXG表和CYERP_BASE表最后一个分区做分区交换(partition exchange),由于分区交换是DDL操作,瞬间完成,那么就消除了拉取数据早晨的基 表CYERP_BASE不可用的情况。
3)定时任务我则配置成 :
8:00 从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表 这样能够保证CYERP_BASE_EXG表的数据是8:00之前的。
9:00 做分区交换,将CYERP_BASE_EXG的数据exchange到CYERP_BASE表的最后一个分区 这样就实现了BI 库 9点以后可以查询8点之前的业务数据。
做成定时任务,这两个存储过程的执行间隔设置为每小时一执行,先执行从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表。
下面是实施步骤:
<1>建表:
创建BI 程序使用的事实表 CYERP_BASE
create table CYERP_BASE
(
created
TIMESTAMP(6),
prosku
VARCHAR2(20),
totalnum
NUMBER(14,2),
totalnums
NUMBER(14,2),
totalprice NUMBER(14,2),
pronum
VARCHAR2(20),
proname
VARCHAR2(200),
specvalue
VARCHAR2(200),
procode
VARCHAR2(20),
unit
VARCHAR2(20),
price
NUMBER(14,2),
proarea
VARCHAR2(200),
province
NUMBER(10),
CITY
NUMBER(10),
area
NUMBER(10),
orgCode
VARCHAR2(50)
)
PARTITION BY RANGE (created)
(PARTITION P201509
VALUES LESS THAN (TIMESTAMP '2015-10-01 00:00:00') ,
PARTITION P201510
VALUES LESS THAN (TIMESTAMP '2015-11-01 00:00:00') ,
PARTITION P201511
VALUES LESS THAN (TIMESTAMP '2015-12-01 00:00:00') ,
PARTITION P201512
VALUES LESS THAN (TIMESTAMP '2016-01-01 00:00:00') ,
PARTITION P201601
VALUES LESS THAN (TIMESTAMP '2016-02-01 00:00:00') ,
PARTITION P201602
VALUES LESS THAN (TIMESTAMP '2016-03-01 00:00:00') ,
PARTITION P201603
VALUES LESS THAN (TIMESTAMP '2016-04-01 00:00:00') ,
PARTITION P201604
VALUES LESS THAN (TIMESTAMP '2016-05-01 00:00:00') ,
PARTITION P201605
VALUES LESS THAN (TIMESTAMP '2016-06-01 00:00:00') ,
PARTITION P201606
VALUES LESS THAN (TIMESTAMP '2016-07-01 00:00:00') ,
PARTITION PMAX
VALUES LESS THAN (MAXVALUE) ) ;
创建用于分区交换的表CYERP_BASE_EXG:
create table CYERP_BASE_EXG as select * from CYERP_BASE where 1=0;
<2> 还要攻克一个难点,就是我给定当前的一个日期,怎么获取当前日期对应到分区表的哪个分区中呢? 也就是说如何定位要被用作分区交换的分区? 例如:当前时间为2016 年4月3日,那么首先获取到4月3日这个月的第一天是4月1号,然后需要确定4月1号开始这个月的数据落在了CYER_BASE这个表的哪个分区中。
我的解决思路就是:
select a.table_name,a.partition_name,a.high_value from USER_TAB_PARTITIONS a
2016年4月的数据会落入到P201604分区里面,这个分区的的HIGH_VALE值为 TIMESTAMP' 2016-05-01 00:00:00'。 那么我就可以求出trunc(sysdate,'month') 当前月第一天的日期和 HIGH_VALE值 相比较,如果 某个分区HIGH_VALUE的值大于trunc(sysdate,'month') 就是符合条件的分区名,但是如果后面有新添加的空的预留分区,也会出现这些分区的 HIGH_VALUE的值大于trunc(sysdate,'month') 的情况,那么我就先order by 符合条件的分区升序排列,然后取第一个,肯定就是要交换的分区名字了,但是由于HIGH_VALUE的数据类型是CLOB 类型,必须有一个包和包体 将CLOB字段转成VARCHAR2字段。
<3> 创建CLOB---->VARCHAR2的转换包和包体
reate or replace package long_help
authid current_user
as
function substr_of
( p_query in varchar2,
p_from
in number,
p_for
in number,
p_name1 in varchar2 default NULL,
p_bind1 in varchar2 default NULL,
p_name2 in varchar2 default NULL,
p_bind2 in varchar2 default NULL,
p_name3 in varchar2 default NULL,
p_bind3 in varchar2 default NULL,
p_name4 in varchar2 default NULL,
p_bind4 in varchar2 default NULL )
return varchar2;
end;
/
create or replace package body long_help
as
g_cursor number := dbms_sql.open_cursor;
g_query
varchar2(32765);
procedure bind_variable( p_name in varchar2, p_value in varchar2 )
is
begin
if ( p_name is not null )
then
dbms_sql.bind_variable( g_cursor, p_name, p_value );
end if;
end;
function substr_of
( p_query in varchar2,
p_from
in number,
p_for
in number,
p_name1 in varchar2 default NULL,
p_bind1 in varchar2 default NULL,
p_name2 in varchar2 default NULL,
p_bind2 in varchar2 default NULL,
p_name3 in varchar2 default NULL,
p_bind3 in varchar2 default NULL,
p_name4 in varchar2 default NULL,
p_bind4 in varchar2 default NULL )
return varchar2
as
l_buffer
varchar2(4000);
l_buffer_len
number;
begin
if ( nvl(p_from,0) <= 0 )
then
raise_application_error
(-20002, 'From must be >= 1 (positive numbers)' );
end if;
if ( nvl(p_for,0) not between 1 and 4000 )
then
raise_application_error
(-20003, 'For must be between 1 and 4000' );
end if;
if ( p_query <> g_query or g_query is NULL )
then
if ( upper(trim(nvl(p_query,'x'))) not like 'SELECT%')
then
raise_application_error
(-20001, 'This must be a select only' );
end if;
dbms_sql.parse( g_cursor, p_query, dbms_sql.native );
g_query := p_query;
end if;
bind_variable( p_name1, p_bind1 );
bind_variable( p_name2, p_bind2 );
bind_variable( p_name3, p_bind3 );
bind_variable( p_name4, p_bind4 );
dbms_sql.define_column_long(g_cursor, 1);
if (dbms_sql.execute_and_fetch(g_cursor)>0)
then
dbms_sql.column_value_long
(g_cursor, 1, p_for, p_from-1,
l_buffer, l_buffer_len );
end if;
return l_buffer;
end substr_of;
end;
/
<4> 创建查询出分区名字的视图:
CREATE OR REPLACE VIEW YWS_PART_DATE AS
SELECT "TABLE_NAME","PARTITION_NAME","HIGH_VALUE"
FROM (SELECT
TABLE_NAME,
PARTITION_NAME,
LONG_HELP.SUBSTR_OF('SELECT HIGH_VALUE
FROM
USER_TAB_PARTITIONS WHERE
TABLE_NAME=:TABLE_NAME
AND PARTITION_NAME=:PARTITION_NAME',
1,
4000,
'TABLE_NAME',
TABLE_NAME,
'PARTITION_NAME',
PARTITION_NAME) HIGH_VALUE
FROM USER_TAB_PARTITIONS)
where table_name = 'CYERP_BASE';
<5> 建立从BI库到到cyerp 库的DB LINK 用于跨库拉取数据:
create public database link XEDG
connect to cyerp identified by pwcyerp
using 'xedg';
<6> 同步所有基础数据(略)
<7> 创建同步到CYERP_BASE_EXG表的存储过程
create or replace procedure cyerptobi_exg as
current_partition varchar2(4000);
v_sql
varchar2(4000);
begin
v_sql:='truncate table CYERP_BASE_EXG';
execute immediate v_sql;
insert into /*+ append */
CYERP_BASE_EXG
select created,
prosku,
t.totalnum,
t.totalnums,
t.totalprice,
t.pronum,
t.proname,
t.specvalue,
t.procode,
t.unit,
t.price,
t.proarea,
t.province,
t.city,
t.area,
t.orgCode
from (SELECT item.create_date created,
item.pro_sku prosku,
(SELECT SUM(it.NORM_WEIGHT)
FROM cy_order_item@xedg it
WHERE it.pro_sku = item.pro_sku
AND it.pro_num = item.pro_num
AND it.price = item.price
AND it.create_date >=
to_timestamp(to_char(trunc(sysdate, 'month'),
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')
AND it.create_date <=
to_timestamp(to_char(sysdate,
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')) totalnum,
(SELECT SUM(it.nums)
FROM cy_order_item@xedg it
WHERE it.pro_sku = item.pro_sku
AND it.pro_num = item.pro_num
AND it.price = item.price
AND it.create_date >=
to_timestamp(to_char(trunc(sysdate, 'month'),
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')
AND it.create_date <=
to_timestamp(to_char(sysdate,
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')) totalnums,
(SELECT SUM(it.total_price)
FROM cy_order_item@xedg it
WHERE it.pro_sku = item.pro_sku
AND it.pro_num = item.pro_num
AND it.price = item.price
AND it.create_date >=
to_timestamp(to_char(trunc(sysdate, 'month'),
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')
AND it.create_date <=
to_timestamp(to_char(sysdate,
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')) totalprice,
item.pro_num pronum,
item.pro_name proname,
item.spec_value specvalue,
item.pro_code procode,
item.unit,
item.price,
info.ca_province province,
info.ca_city city,
info.ca_area area,
item.org_code orgCode,
getAreaName(info.PRO_SALE_AREA) proarea
FROM cy_order_item@xedg item
LEFT JOIN cy_order_info@xedg info
ON info.order_id = item.order_id
WHERE item.pro_type ! = 2
AND info.status
! = 4
AND item.create_date >=
to_timestamp(to_char(trunc(sysdate, 'month'),
'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')
AND item.create_date <=
to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'),
'yyyy-MM-dd HH24:MI:ss')) t;
commit;
end;
/
<8> 创建用于分区交换的存储过程:CYERP_BASE_EXG----->CYERP_BASE表
create or replace procedure cyerptobi_daily as
current_partition varchar2(4000);
v_sql1
varchar2(4000);
v_sql2
varchar2(4000);
begin
select PARTITION_NAME
into current_partition from yws_part_date where to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss') > trunc(sysdate,'month')
and rownum=1 order by to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss');
v_sql1:='alter table CYERP_BASE
drop unused columns';
v_sql2:='alter table CYERP_BASE exchange partition '||current_partition||' with table CYERP_BASE_EXG';
execute immediate v_sql1;
execute immediate v_sql2;
delete CYERP_BASE
where rowid in (select rid
from (select rowid rid,
row_number() over(partition by PROSKU,PROAREA,PRICE ORDER by created desc) rn
from CYERP_BASE)
where rn > 1);
commit;
end;
/
最后一步的delete 是考虑到要求3,要根据PROSKU,PROAREA,PRICE这三列去重,即要求是 如果有多行有相同PROSKU,PROAREA,PRICE,那么只保留CREATED 创建时间最靠的那一行数据。
以下是创建JOB 去定时跑这两个存储过程:
begin
sys.dbms_scheduler.create_job(job_name
=> 'SUPER_BI.ETL_DATA',
job_type
=> 'STORED_PROCEDURE',
job_action
=> '"SUPER_BI"."CYERPTOBI_EXG"',
start_date
=> to_date('25-12-2015 21:00:00', 'dd-mm-yyyy hh24:mi:ss'),
repeat_interval
=> 'Freq=HOURLY;Interval=2',
end_date
=> to_date(null),
job_class
=> 'DEFAULT_JOB_CLASS',
enabled
=> true,
auto_drop
=> false,
comments
=> '');
end;
/
begin
sys.dbms_scheduler.create_job(job_name
=> 'SUPER_BI.EXCHANGE_PART',
job_type
=> 'STORED_PROCEDURE',
job_action
=> '"SUPER_BI"."CYERPTOBI_DAILY"',
start_date
=> to_date('25-12-2015 22:00:00', 'dd-mm-yyyy hh24:mi:ss'),
repeat_interval
=> 'Freq=HOURLY;Interval=2',
end_date
=> to_date(null),
job_class
=> 'DEFAULT_JOB_CLASS',
enabled
=> true,
auto_drop
=> false,
comments
=> '');
end;
/
最后
以上就是会撒娇悟空为你收集整理的设计案例3----利用存储过程和JOB 设计从餐饮ERP数据库将数据抽取、数据清洗到BI数据库的全部内容,希望文章能够帮你解决设计案例3----利用存储过程和JOB 设计从餐饮ERP数据库将数据抽取、数据清洗到BI数据库所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复