我是靠谱客的博主 友好野狼,最近开发中收集的这篇文章主要介绍【由浅至深】redis 实现发布订阅的几种方式【由浅至深】redis 实现发布订阅的几种方式前言方法一:SUBSCRIBE + PUBLISH方法二:BLPOP + LPUSH(争抢)方法三:BLPOP + LPUSH(非争抢)测试代码结语,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

【由浅至深】redis 实现发布订阅的几种方式

非常感谢依乐祝发表文章《.NET Core开发者的福音之玩转Redis的又一傻瓜式神器推荐》,对csredis作了一次完整的诠释。

前言

提到消息队列,最熟悉无疑是 rabbitmq,它基本是业界标准的解决方案。本文详细介绍 redis 多种实现轻订阅方法,作者认为非常有趣并加以总结,希望对有需要的朋友学习 redis 功能有一定的带入作用。

方法一:SUBSCRIBE + PUBLISH

//程序1:使用代码实现订阅端
var sub = RedisHelper.Subscribe(("chan1", msg => Console.WriteLine(msg.Body)));
//sub.Disponse(); //停止订阅

//程序2:使用代码实现发布端
RedisHelper.Publish("chan1", "111");

优势:支持多端订阅、简单、性能高;
缺点:数据会丢失;

参考资料:http://doc.redisfans.com/pub_sub/subscribe.html

方法二:BLPOP + LPUSH(争抢)

//程序1:使用代码实现订阅端
while (running) {
    try {
        var msg = RedisHelper.BLPop(5, "list1");
        if (string.IsNullOrEmpty(msg) == false) {
            Console.WriteLine(msg);
        }
    } catch (Exception ex) {
        Console.WriteLine(ex.Message);
    }
}

//程序2:使用代码实现发布端
RedisHelper.LPush("list1", "111");

优势:数据不会丢失、简单、性能高;
缺点:不支持多端(存在资源争抢);

总结:为了解决方法一的痛点,我们实现了本方法,并且很漂亮的制造了一个新问题(不支持多端订阅)。

学习使用 BLPOP

BLPOP key [key ...] timeout

BLPOP 是列表的阻塞式(blocking)弹出原语。

它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

非阻塞行为

当 BLPOP 被调用时,如果给定 key 内至少有一个非空列表,那么弹出遇到的第一个非空列表的头元素,并和被弹出元素所属的列表的名字一起,组成结果返回给调用者。

当存在多个给定 key 时, BLPOP 按给定 key 参数排列的先后顺序,依次检查各个列表。

假设现在有 job 、 command 和 request 三个列表,其中 job 不存在, command 和 request 都持有非空列表。考虑以下命令:

BLPOP job command request 0

BLPOP 保证返回的元素来自 command ,因为它是按”查找 job -> 查找 command -> 查找 request “这样的顺序,第一个找到的非空列表。

redis> DEL job command request           # 确保key都被删除
(integer) 0

redis> LPUSH command "update system..."  # 为command列表增加一个值
(integer) 1

redis> LPUSH request "visit page"        # 为request列表增加一个值
(integer) 1

redis> BLPOP job command request 0       # job 列表为空,被跳过,紧接着 command 列表的第一个元素被弹出。
1) "command"                             # 弹出元素所属的列表
2) "update system..."                    # 弹出元素所属的值

阻塞行为

如果所有给定 key 都不存在或包含空列表,那么 BLPOP 命令将阻塞连接,直到等待超时,或有另一个客户端对给定 key 的任意一个执行 LPUSH 或 RPUSH 命令为止。

超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长(block indefinitely) 。

redis> EXISTS job                # 确保两个 key 都不存在
(integer) 0
redis> EXISTS command
(integer) 0

redis> BLPOP job command 300     # 因为key一开始不存在,所以操作会被阻塞,直到另一客户端对 job 或者 command 列表进行 PUSH 操作。
1) "job"                         # 这里被 push 的是 job
2) "do my home work"             # 被弹出的值
(26.26s)                         # 等待的秒数

redis> BLPOP job command 5       # 等待超时的情况
(nil)
(5.66s)                          # 等待的秒数

更多学习资料:http://doc.redisfans.com/list/blpop.html

方法三:BLPOP + LPUSH(非争抢)

本方法根据方法二演变而来,设计图如下:

如何实现三端订阅,都可收到消息,三端分别为 sub3, sub4, sub5:

1、sub3, sub4, sub5 使用【方法二】订阅 listkey:list1_sub3,list1_sub4,list1_sub5;

2、总订阅端订阅 listkey:list1,总订阅端收到消息后,执行 lpush list1_sub1 msg, lpush list1_sub2 msg, lpush list1_sub3 msg;

总订阅端订阅原始消息,随后将消息分发给其他订阅端,从而解决【方法二】不支持多端同时订阅的缺点。

最终实现的逻辑为:多端先争抢 list1 消息,抢到者再向其他端转发消息。

测试代码

nuget Install-Package CSRedisCore

var rds = new CSRedis.CSRedisClient("127.0.0.1:6379,password=,poolsize=50,ssl=false,writeBuffer=10240");

//sub1, sub2 争抢订阅(只可一端收到消息)
var sub1 = rds.SubscribeList("list1", msg => Console.WriteLine($"sub1 -> list1 : {msg}"));
var sub2 = rds.SubscribeList("list1", msg => Console.WriteLine($"sub2 -> list1 : {msg}"));

//sub3, sub4, sub5 非争抢订阅(多端都可收到消息)
var sub3 = rds.SubscribeListBroadcast("list2", "sub3", msg => Console.WriteLine($"sub3 -> list2 : {msg}"));
var sub4 = rds.SubscribeListBroadcast("list2", "sub4", msg => Console.WriteLine($"sub4 -> list2 : {msg}"));
var sub5 = rds.SubscribeListBroadcast("list2", "sub5", msg => Console.WriteLine($"sub5 -> list2 : {msg}"));

//sub6 是redis自带的普通订阅
var sub6 = rds.Subscribe(("chan1", msg => Console.WriteLine(msg.Body)));

Console.ReadKey();
sub1.Dispose();
sub2.Dispose();
sub3.Dispose();
sub4.Dispose();
sub5.Dispose();
sub6.Dispose();

rds.Dispose();
return;

测试功能时,发布端可以使用 redis-cli 工具。

结语

redis 功能何其多且相当好玩有趣 ,大家应尽可能多带着兴趣爱好去学习它。

若文中有不好的地方,请提出批评与改正方法,谢谢观赏。

本文使用到 CSRedisCore 的开源地址:https://github.com/2881099/csredis

 

原文:https://www.cnblogs.com/kellynic/p/9952386.html

 

 

评论列表

  

#1楼 2018-11-13 15:28 依乐祝

博主实现的真好!顶一个!

支持(1) 反对(0)

  

#2楼 [楼主] 2018-11-14 09:03 nicye

强行推荐,这些小编,哈哈

支持(1) 反对(0)

  

#3楼 2018-11-14 09:10 陌上不开花

想了解一下redis中的队列,是否有事务的机制?

支持(0) 反对(0)

  

#4楼 [楼主] 2018-11-14 09:13 nicye

@ 陌上不开花
redis是单线程的,没有事务机制。

支持(0) 反对(0)

  

#5楼 2018-11-14 09:14 陌上不开花

@ nicye
不好意思,刚才我的表达有误,其实我是想问,reids中的mq,是否有确认机制?即保证该消息被正确消费(类似rabbitmq中的confirm)

支持(0) 反对(0)

  

#6楼 [楼主] 2018-11-14 09:48 nicye

@ 陌上不开花
没有的,可以变相实现

支持(0) 反对(0)

  

#7楼 2018-11-14 11:41 wdwwtzy

第一个方式为什么会数据丢失???

支持(0) 反对(0)

  

#8楼 [楼主] 2018-11-14 12:26 nicye

@ wdwwtzy
订阅方与redis-server断线的时候

支持(0) 反对(0)

  

#9楼 2018-11-14 13:17 ~雨落忧伤~

支持

支持(0) 反对(0)

  

#10楼 2018-11-14 17:42 羞涩老李

mark

支持(0) 反对(0)

  

#11楼 2018-11-15 09:16 陈惊蛰

支持

支持(0) 反对(0)

  

#12楼 2019-10-08 17:13 单车Hus

@ nicye
怎么变相实现,有没有思路

支持(0) 反对(0)

  

#13楼 [楼主] 2019-10-08 21:13 nicye

@ 单车Hus
以前没有,5.0有stream

支持(0) 反对(0)

  

#14楼 2019-12-04 18:14 jobroon

您好,我在执行您写的CSRedisCore.Tests时,报以下错误 :
[2019/12/4 18:12:15 Informational] ------ 开始运行测试 ------
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.3009750] Discovering: CSRedisCore.Tests
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.3440044] Discovered: CSRedisCore.Tests
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.3511795] Starting: CSRedisCore.Tests
[2019/12/4 18:12:16 Error] [xUnit.net 00:00:00.4510303] CSRedisCore.Tests.CSRedisClientKeyTests.Del [FAIL]
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4520553] System.ArgumentNullException : Value cannot be null.
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4521095] Parameter name: policy
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4528198] Stack Trace:
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4535259] at SafeObjectPool.ObjectPool`1..ctor(IPolicy`1 policy)
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4536170] E:Projectdemocsredis-mastersrcRedisClientPool.cs(15,0): at CSRedis.RedisClientPool..ctor(String connectionString, Action`1 onConnected)
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4538906] E:Projectdemocsredis-mastersrcCSRedisClient.cs(300,0): at CSRedis.CSRedisClient..ctor(Func`2 NodeRule, String[] sentinels, Boolean readOnly, String[] connectionStrings)
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4539596] E:Projectdemocsredis-mastersrcCSRedisClient.cs(223,0): at CSRedis.CSRedisClient..ctor(String connectionString)
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4540231] E:Projectdemocsredis-mastertestCSRedisCore.TestsTestBase.cs(20,0): at CSRedisCore.Tests.TestBase..ctor()
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4540703] at CSRedisCore.Tests.CSRedisClientKeyTests..ctor()
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4644549] Finished: CSRedisCore.Tests
[2019/12/4 18:12:16 Informational] ========== 运行测试结束: 1 运行(0:00:01.1080749) ==========

支持(0) 反对(0)

  

#15楼 [楼主] 2019-12-04 22:22 nicye

@ jobroon
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4520553] System.ArgumentNullException : Value cannot be null.
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4521095] Parameter name: policy
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4528198] Stack Trace:
[2019/12/4 18:12:16 Informational] [xUnit.net 00:00:00.4535259] at SafeObjectPool.ObjectPool`1..ctor(IPolicy`1 policy)


源码不是新的,重新拉取一下

支持(0) 反对(0)

  

#16楼 2019-12-15 12:52 焰火阑珊大神,小白请教一下,用BLPOP时候为啥会报错?
以下代码,用shell向TaskQueue这个list里面插入数据,刚开始还工作正常,隔个10秒钟再插入就报错了,什么原因啊?类似的代码用serviceStack就没毛病

1

2

3

4

5

6

7

var csredis = new CSRedis.CSRedisClient("192.168.0.104:6379,password=123456");

RedisHelper.Initialization(csredis);

while (true)

{

    string id = RedisHelper.BLPop(0, "TaskQueue");         

    Console.WriteLine(id);

}


支持(0) 反对(0)

  

#17楼 [楼主] 2019-12-16 10:43 nicye

@ 焰火阑珊
因为 Socket 有 ReceiveTimeout 属性设置,超时没数据就报这个错误。

BLPop 不适合写 0 参数,又或者设置连接配置 synctimeout,默认是 10 秒。

最后

以上就是友好野狼为你收集整理的【由浅至深】redis 实现发布订阅的几种方式【由浅至深】redis 实现发布订阅的几种方式前言方法一:SUBSCRIBE + PUBLISH方法二:BLPOP + LPUSH(争抢)方法三:BLPOP + LPUSH(非争抢)测试代码结语的全部内容,希望文章能够帮你解决【由浅至深】redis 实现发布订阅的几种方式【由浅至深】redis 实现发布订阅的几种方式前言方法一:SUBSCRIBE + PUBLISH方法二:BLPOP + LPUSH(争抢)方法三:BLPOP + LPUSH(非争抢)测试代码结语所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部