文章目录
本文参考源码版本为
redis6.2,redisson3.17.5
发布订阅模式,本质来说,是将提供消息的人和需要消息的人,通过第三方组件联系起来,使得两类群体之间的消息能够及时触达。
比如,在一些优化场景下,可能会使用 本地 + 远程 双缓存机制,远程缓存是一套共用的中间件,总共只有一套数据。而 本地缓存就不一样了,如果你部署的是多个实例,那就有多套本地数据,当数据更新了,如何触达这些本地缓存?
这个时候,你就可以考虑使用发布订阅模式,消息提供者 - 更新数据的人,消息接收方 - 需要更新本地缓存的服务。
我们以 redis 发布订阅的实现来看,其实是非常简单的,从原理到实际编码都是如此。
值得注意的是,上述消息不会持久化,都是直来直去,立即转发。因此,你会看到,如果某个订阅者掉线了,后面即使上线了,这掉线期间的消息也无法重新接收到。
诶,你发现,服务端实现是不是很简单?基本上维护服务端与订阅者的长连接即可,后续有消息过来直接推送;当订阅者取消订阅或者掉线,服务端删除对应订阅关系即可。
好,说完了服务端,我们来看看客户端该怎么做?
客户端的订阅者想要接收消息,肯定也会维护长连接,因此,当我们订阅通道时,本质就是向 redis 服务发送订阅请求,并维护此长连接,用于后续消息通信。
SUBSCRIBE channels
当发起订阅之后,redis-cli 进入阻塞状态,这个时候无法发送其他命令,也就是处于监听消息的状态:
127.0.0.1:6379> SUBSCRIBE foo bar
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "foo"
3) (integer) 1
1) "subscribe"
2) "bar"
3) (integer) 2
然后,我们往 foo 通道发送消息:
127.0.0.1:6379> PUBLISH foo hello
(integer) 1
看看接收方:
127.0.0.1:6379> SUBSCRIBE foo bar
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "foo"
3) (integer) 1
1) "subscribe"
2) "bar"
3) (integer) 2
1) "message"
2) "foo"
3) "hello"
你会看到最后三条数据,message 表示类型、foo 表示通道、hello 表示具体消息。实时触达。
你可能也注意到了,这里我们可以一次性订阅多个通道。当你订阅几个相似的通道时,比如这样:
127.0.0.1:6379> SUBSCRIBE foo.a foo.b foo.c foo.d
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "foo.a"
3) (integer) 1
1) "subscribe"
2) "foo.b"
3) (integer) 2
1) "subscribe"
2) "foo.c"
3) (integer) 3
1) "subscribe"
2) "foo.d"
3) (integer) 4
然后就可以接收任何一个通道发过来的信息。
你看,这四个通道都有相同的前缀 foo.,redis 也考虑到了这点,提供了模式订阅方式 PSUBSCRIBE:
127.0.0.1:6379> PSUBSCRIBE foo.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "foo.*"
3) (integer) 1
往四个通道发送数据看看:
127.0.0.1:6379> PUBLISH foo.a hello_a
(integer) 1
127.0.0.1:6379> PUBLISH foo.b hello_b
(integer) 1
127.0.0.1:6379> PUBLISH foo.c hello_c
(integer) 1
127.0.0.1:6379> PUBLISH foo.d hello_d
(integer) 1
再看看接收方:
127.0.0.1:6379> PSUBSCRIBE foo.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "foo.*"
3) (integer) 1
1) "pmessage"
2) "foo.*"
3) "foo.a"
4) "hello_a"
1) "pmessage"
2) "foo.*"
3) "foo.b"
4) "hello_b"
1) "pmessage"
2) "foo.*"
3) "foo.c"
4) "hello_c"
1) "pmessage"
2) "foo.*"
3) "foo.d"
4) "hello_d"
可以看到,4 个通道的消息都能接收到,其实不止,只要匹配这个模式 foo. 消息都能够接收到;另外,消息类型也变成了 pmessage
PubSub 也是一种生产者/消费者模型,和传统 MQ 不同的是,经过 PubSub 的消息只做转发,不做存储,从实现上来更加简单。
我画了张图,大概是这样:

不知你有没有思考过,服务端是如何推送消息给对应的订阅者的?怎么来识别客户端?IP + PORT 还是什么唯一的身份标志?
其实,redis 处理方式很简单,只是一直维护着这些订阅的 TCP 长连接请求。你想想,这些连接都是经过 TCP 三次握手建立的,只要双方都没断开,便可一直保持通信。这样一来,服务端要做的事情就很简单了是吧?
在服务端看来,客户端发来的 subscribe 指令与普通请求并无差别,服务端在接收到请求时,会加一个 flag 标志,之后便将这些连接保存在特定的结构中,等待 publish 指令的到来。
当服务端收到 publish 指令后,会匹配相应的 channel,并将消息直接推送给订阅的客户端。所以,你会看到,不管是客户端还是服务端,都需要维护这么一个长连接。
另外,对于客户端来说,也同样没有什么特别之处,创建 TCP 连接之后,将指令往连接通道一发就完事了。
不过,由于这些订阅连接需要一直保持状态以监听服务端的消息到来,因此,在客户端设计中,通常会把常规指令和 PubSub 指令分开处理,比如 Redisson,就分别定义了不同的连接池。
总的来说,客户端与服务端是通过 TCP 长连接持续通信,一个客户端实例可以创建很多个这样的长连接,因此,客户端的设计中,一般也会给每个客户端实例设置一个专用连接池。

从 PubSub 这一块内容来看,要想设计一个款高效的客户端,客户端的工作也不少~
我们先看看对 Pubsub 的结构定义:
struct redisServer {
...
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
...
}
可以看到,服务端的结构中,只记录了通道与客户端订阅关系,没有任何消息会被保存下来。
我画了两张图,你可以参考下:
1)pubsub_channels 的字典结构:

2)pubsub_patterns 的链表模式:

当然,pubsub_patterns 后面也提供了字典结构 pubsub_patterns_dict。
订阅和取消订阅:本质都是在维护以上订阅关系列表,比如收到订阅请求时,会遍历 pubsub_channels 或者 pubsub_patterns 结构,然后将请求添加至链表尾部。
值得注意的是,这里的客户端,本质也是 TCP 连接,可随时互动通信。
消息发布:当服务端接收到发布的消息之后,会先遍历 pubsub_channels 字典,如果找到对应的订阅客户端,则直接推送消息,直到遍历完整个 pubsub_channels 字典。
然后遍历 pubsub_patterns 链表,对匹配上的订阅客户端,同样是直接推送消息。因此,你会看到,即使同样一条消息,匹配多个模式之后,也会推送多次。
试想一下,如果你要订阅一个 channel,你会如何定义客户端?
首先,要想订阅 channel 肯定需要向服务端发送 subscribe channels 指令。
其次,发送指令订阅了 channel 之后只是建立了正常连接,后续还需要监听服务端推送的消息,因此,需要定义监听方法;不过,对于同一个 channel 我们可能同时有多种处理方式,所以,这里的监听器需要定义成列表形式。
另外,我们可能在不同时刻需要和多个 channel 存在订阅关系,因此会存在多个长连接,此时,我们考虑使用连接池。
那连接池是考虑定义专用的还是和普通请求的连接池公用?考虑 PubSub 的特殊性,我们决定使用专用的 PubSub 连接池。
再来看看 Redisson 客户端,通常情况下,我们会先定义一个 RedissonClient 实例,这个 RedissonClient 客户端下就会有多种连接池,其中就包括 PubSub 的专用连接池 PubSubConnectionPool。
值得注意的是,subscribe 指令是阻塞式的,因此,一般考虑使用异步线程池来提交请求;在 redisson 中,使用 Netty 自带异步功能来完成,本质也是使用异步线程池。
以 redisson 客户端为例,我画了张图,大概是这样:

PubSub 有 MQ 的一些特性,比如 解藕,另外还有消息及时转发等。
当你使用 服务本地内存 + redis 等中间缓存等双缓存特性时,在这种情况下你就需要考虑本地缓存更新的及时性,PubSub 便可以做到。
另外,我们在来看看 redisson,在分布式场景的多节点下,比如我们要同时获取一把锁,同一时间肯定只有一个节点能正常获取锁,其他节点只能等待锁的释放,那需要等待多久呢?
一般通过 while(true) 循环尝试获取锁,每循环一次都尝试休眠一段时间;假设在节点休眠期间锁释放了,该节点反应是不是就迟钝了?
类似于这样的情况,也可以考虑使用 PubSub,当节点处于等待获取锁的情况下,先订阅一个通道,等到锁释放之后,持有锁的节点发送通知,然后等待锁的节点基本上可以实时接收通知,并尝试获取锁。

发布订阅也是一种生产者、消费者模型,借助于第三方组件,解藕两者关系,避免直接交互。
redis 实现的 PubSub 比较简单,消息在服务端只做转发,不做存储。
另外,服务端会维护订阅者的 TCP 连接,方便消息及时触达。
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
有人知道在发布新版本的Ruby和Rails时收到电子邮件的方法吗?他们有邮件列表,RubyonRails有一个推特,但我不想听到那些随之而来的喧嚣,我只想知道什么时候发布新版本,尤其是那些有安全修复的版本。 最佳答案 从therailsblog获取提要.http://weblog.rubyonrails.org/feed/atom.xml 关于ruby-on-rails-如何在发布新的Ruby或Rails版本时收到通知?,我们在StackOverflow上找到一个类似的问题:
尝试从我的AngularJS端将数据发布到Rails服务器时出现问题。服务器错误:ActionController::RoutingError(Noroutematches[OPTIONS]"/users"):actionpack(4.1.9)lib/action_dispatch/middleware/debug_exceptions.rb:21:in`call'actionpack(4.1.9)lib/action_dispatch/middleware/show_exceptions.rb:30:in`call'railties(4.1.9)lib/rails/rack/logg
当音乐碰上区块链技术,会擦出怎样的火花?或许周杰伦已经给了我们答案。8月29日下午,B站独家首发周杰伦限定珍藏Demo独家访谈VCR,周杰伦在VCR里分享了《晴天》《青花瓷》《搁浅》《爱在西元前》四首经典歌曲Demo背后的创作故事,并首次公布18年前未发布的神秘作品《纽约地铁》的Demo。在VCR中,方文山和杰威尔音乐提及到“多亏了区块链技术,现在我们可以将这些Demos,变成独一无二具有收藏价值的艺术品,这些Demos可以在薄盒(国内数藏平台)上听到。”如何将音乐与区块链技术相结合,薄盒方面称:“薄盒作为区块链技术服务方,打破传统对于区块链技术只能作为数字收藏的理解。聚焦于区块链技术赋能,在
我想上传我在运行时用Ruby生成的数据,就像从block中提供上传数据一样。我找到的所有示例仅展示了如何流式传输必须在请求之前位于磁盘上的文件,但我不想缓冲该文件。除了滚动我自己的套接字连接之外,最好的解决方案是什么?这是一个伪代码示例:post_stream('127.0.0.1','/stream/')do|body|generate_xmldo|segment|body 最佳答案 有效的代码。require'thread'require'net/http'require'base64'require'openssl'class
昨晚看到IDEA官推宣布IntelliJIDEA2023.1正式发布了。简单看了一下,发现这次的新版本包含了许多改进,进一步优化了用户体验,提高了便捷性。至于是否升级最新版本完全是个人意愿,如果觉得新版本没有让自己感兴趣的改进,完全就不用升级,影响不大。软件的版本迭代非常正常,正确看待即可,不持续改进就会慢慢被淘汰!根据官方介绍:IntelliJIDEA2023.1针对新的用户界面进行了大量重构,这些改进都是基于收到的宝贵反馈而实现的。官方还实施了性能增强措施,使得Maven导入更快,并且在打开项目时IDE功能更早地可用。由于后台提交检查,新版本提供了简化的提交流程。IntelliJIDEA
Unity数据可视化图表插件XCharts3.0发布历时8个多月,业余时间,断断续续,XCharts3.0总算发布了。如果要打个满意度,我给3.0版本来个80分。对于代码框架结构设计的调整改动,基本符合预期,甚是满意。相比之前的1.0和2.0版本,我认为3.0才是一个拿得出手给广大开发者使用的版本。1.0发布的时候,很兴奋,从0.1到1.0,也磨了一年,真的等不及想给大家试用了,还特地写过一篇文章以示庆祝。那个时候,1.0虽然还还不够完善,功能也不够丰富,但它是XCharts的开始,没有1.0,也就没有后面的2.0和3.0。后面的2.0发布,做了很多改进和优化,随着版本迭代,慢慢的发现有不少硬
将stripe的API与RubyonRails结合使用我无法保存订阅。我能够检索、更新和保存客户对象:customer=Stripe::Customer.retrieve(some_customer_id)#thisworkscustomer.save#thisworks我还可以检索订阅:subscription=customer.subscriptions.retrieve("some_subscription_id")#这个有效但是,在尝试保存订阅时:subscription.save#这不起作用我不断得到这个:NoMethodError:undefinedmethod`save'
我有一个模型依赖于一个单独的、联合的模型。classMagazine图像是多态的,可以附加到许多对象(页面和文章),而不仅仅是杂志。杂志需要在相关图像发生任何变化时自行更新该杂志还保存了一张自己的截图,可用于宣传:classMagazine现在如果图像发生变化,杂志也需要更新其截图。所以杂志真的需要知道图片什么时候出了问题。所以我们可以天真地直接从封面图片触发屏幕截图更新classImage...但是图片不应该代表杂志做事然而,图片可以用于许多不同的对象,实际上不应该对杂志进行特定的操作,因为这不是图片的责任。该图像也可能附加到页面或文章,并且不需要为它们做各种事情。“正常”的rail
CSDN优秀解读:https://blog.csdn.net/jiaoyangwm/article/details/1266387752021https://arxiv.org/pdf/2103.14259.pdf关键解读在目标检测中标签分配的最新进展主要寻求为每个GT对象独立定义正/负训练样本。在本文中,我们创新性地从全局的角度重新审视标签分配,并提出将分配程序制定为一个最优传输(OT)问题——优化理论中一个被充分研究的课题。具体来说,我们将每个需求方(锚框)和供应商(GT标签)的单位传输成本定义为他们的分类和回归损失加权之和。在公式化后,找到最好的分配方案即为最小传播成本解决最优传输方案,