我是靠谱客的博主 高挑猫咪,最近开发中收集的这篇文章主要介绍用PostgreSQL支持含有更新,删除,插入的实时流式计算,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。
如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。
在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:
https://yq.aliyun.com/articles/166

要支持更新和删除,思路是这样的,加一张前置表,这个前置表的某个字段用来记录字段的最终状态,即到达这个状态后,记录不会被更新或删除。
通过触发器来控制什么记录插入到流中同时从前置表删除,什么记录现暂存在前置表。
下面是例子
本文假设flag=2是最终状态,应用层自己来定义这个FLAG。

pipeline=# create table pret1(id serial primary key, info text, flag smallint);
CREATE TABLE
pipeline=# create stream s0 (like pret1);
CREATE STREAM
pipeline=# create continuous view v0 as select count(*) from s0;
CREATE CONTINUOUS VIEW

flag=2的记录旁路到流,其他记录放到前置表。

pipeline=# create or replace function tg0() returns trigger as
$$
declare
begin
if new.flag=2 then
insert into s0 values (new.*);
return null;
end if;
return new;
end;
$$
language plpgsql strict;
CREATE FUNCTION
pipeline=# create trigger tg0 before insert on pret1 for each row execute procedure tg0();
CREATE TRIGGER

更新后flag=2的记录旁路到流,并删除前置表的对应记录。

pipeline=# create or replace function tg1() returns trigger as
$$
declare
begin
if new.flag=2 then
insert into s0 values (new.*);
delete from pret1 where id=new.id;
return null;
end if;
return new;
end;
$$
language plpgsql strict;
CREATE FUNCTION
pipeline=# create trigger tg1 before update on pret1 for each row execute procedure tg1();
CREATE TRIGGER

测试

pipeline=# insert into pret1(info,flag) values ('test',0);
INSERT 0 1
pipeline=# select * from v0;
count
-------
(0 rows)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
count
-------
(0 rows)
pipeline=# select * from pret1;
id | info | flag
----+------+------
1 | test |
0
2 | test |
1
(2 rows)
pipeline=# update pret1 set flag=2;
UPDATE 0
pipeline=# select * from pret1;
id | info | flag
----+------+------
(0 rows)
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# delete from pret1 ;
DELETE 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# update pret1 set flag =10;
UPDATE 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# select * from pret1;
id | info | flag
----+------+------
4 | test |
10
(1 row)
pipeline=# update pret1 set flag =2;
UPDATE 0
pipeline=# select * from pret1;
id | info | flag
----+------+------
(0 rows)
pipeline=# select * from v0;
count
-------
3
(1 row)

详情请参考
http://docs.pipelinedb.com/introduction.html

如果你觉得这还不够爽,PostgreSQL还有kafka插件,可以类lambda的模式从kafka持续读数据,进行流式计算。
PostgreSQL就是个"老流氓",因为任何软件可能都和这只大象有一腿。

最后

以上就是高挑猫咪为你收集整理的用PostgreSQL支持含有更新,删除,插入的实时流式计算的全部内容,希望文章能够帮你解决用PostgreSQL支持含有更新,删除,插入的实时流式计算所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部