前言
因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块的通信中心节点(消息的接收和转发)是非常重要的,本篇主要介绍一下openedge-hub模块的启动以及在QOS=0的情况下消息的发送和转发,本文主要是为了记录下思路方便后续改造,因个人水平有限,对于MQTT了解有限,很多名词使用不当,中间若有错误劳烦告知,谢谢!
openedge-hub的启动
openedge-hub的开始的节点是在openedge/openedge-hub/main.go文件进行的,main函数如下所示:
1
2
3
4
5
6
7
8
9
10
11openedge.Run(func(ctx openedge.Context) error { m := mo{log: ctx.Log()} defer m.close() err := m.start() if err != nil { return err } ctx.Wait() return nil })
其作用主要是启动mo对象,mo结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16type mo struct { cfg config.Config Rules *rule.Manager Sessions *session.Manager broker *broker.Broker servers *server.Manager factory *persist.Factory log logger.Logger }
- cfg是读取配置文件后保存配置的实体。
- Rules是消息转发使得“路由器”。
- Sessions保存着所有客户端的连接。
- broker中间带有channel,用于接收来自session发送的消息,并通过这个channel发送至路由器(这里说的有点欠妥,后面会讲到的),路由器找到对应的session,并将消息发送到session对应的客户端。
- factory是用于进行持久化的,暂不做解析。
- log进行日志的记录,暂不做解析。
刚刚main文件中有一行代码是:err := m.start()
,这个就是启动的入口,接下来看一下这个start方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36func (m *mo) start() error { err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg) if err != nil { m.log.Errorln("failed to load config:", err.Error()) return err } m.factory, err = persist.NewFactory(m.cfg.Storage.Dir) if err != nil { m.log.Errorln("failed to new factory:", err.Error()) return err } m.broker, err = broker.NewBroker(&m.cfg, m.factory) if err != nil { m.log.Errorln("failed to new broker:", err.Error()) return err } m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker) if err != nil { m.log.Errorln("failed to new rule manager:", err.Error()) return err } m.Sessions, err = session.NewManager(&m.cfg, m.broker.Flow, m.Rules, m.factory) if err != nil { m.log.Errorln("failed to new session manager:", err.Error()) return err } m.servers, err = server.NewManager(m.cfg.Listen, m.cfg.Certificate, m.Sessions.Handle) if err != nil { m.log.Errorln("failed to new server manager:", err.Error()) return err } m.Rules.Start() m.servers.Start() return nil }
总体上就是分别对mo的各个属性进行初始化,下面分别对每一个属性的初始化进行解析。
cfg加载
cfg加载在main.go中start方法的代码如下:
1
2
3
4
5
6err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg) if err != nil { m.log.Errorln("failed to load config:", err.Error()) return err }
其实就是把yaml对应的属性填充到mo的cfg中,下面介绍一下cfg这个结构体的一些配置项:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// Config all config of edge type Config struct { Listen []string `yaml:"listen" json:"listen"` Certificate utils.Certificate `yaml:"certificate" json:"certificate"` Principals []Principal `yaml:"principals" json:"principals" validate:"principals"` Subscriptions []Subscription `yaml:"subscriptions" json:"subscriptions" validate:"subscriptions"` Message Message `yaml:"message" json:"message"` Status struct { Logging struct { Enable bool `yaml:"enable" json:"enable"` Interval time.Duration `yaml:"interval" json:"interval" default:"1m"` } `yaml:"logging" json:"logging"` } `yaml:"status" json:"status"` Storage struct { Dir string `yaml:"dir" json:"dir" default:"var/db/openedge"` } `yaml:"storage" json:"storage"` Shutdown struct { Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10m"` } `yaml:"shutdown" json:"shutdown"` }
附上一个源码中示例的文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21name: localhub listen: - tcp://0.0.0.0:1883 principals: - username: 'test' password: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08' permissions: - action: 'pub' permit: ['#'] - action: 'sub' permit: ['#'] subscriptions: - source: topic: 't' target: topic: 't/topic' logger: path: var/log/openedge/localhub/localhub.log console: true level: "debug"
- name是模块的名称,localhub的模块起名为localhub。
- listen是用于开启服务器后监听的地址(客户端连接的地址)。
- principals存放客户端连接的用户名、密码(密码是使用SHA-256加密的,客户端连接的时候使用解密后的明文密码)、权限等相关信息。
- subscriptions用于存放消息转发(订阅)的路由规则,source是发布消息时的地址,target是转发的地址(比如A发送的“t”主题消息就会转发到订阅“t/topic”主题的客户端中。
- logger是配置日志相关信息。
通过加载配置文件后,cfg中就存储了这些信息。
broker加载
使用淘宝的人既可以自己开店,也可以去买货,但是卖货的信息发布在什么地方呢?想要买东西从哪里浏览呢?当然是通过淘宝了,broker就是淘宝,所有连接到openedge-hub(后面简称为hub)的session既可以作为卖家发布消息,这个消息就发送到broker的channel(根据QOS不同对应了不同的channel)了,在生活中为了让买家能够买到称心如意的东西,淘宝一般会进行个性化推荐,在hub中每一个session会个性化的订阅主题,这时候路由器(Rule.Manager)就会把符合session需要的消息发送到session中了。
broker初始化的入口如下:
1
2
3
4
5
6m.broker, err = broker.NewBroker(&m.cfg, m.factory) if err != nil { m.log.Errorln("failed to new broker:", err.Error()) return err }
接下来进入到broker.NewBroker这个方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// NewBroker NewBroker func NewBroker(c *config.Config, pf *persist.Factory) (b *Broker, err error) { ··· b = &Broker{ config: c, msgQ0Chan: make(chan *common.Message, c.Message.Ingress.Qos0.Buffer.Size), msgQ1Chan: make(chan *common.Message, c.Message.Ingress.Qos1.Buffer.Size), msgQ1DB: msgqos1DB, offsetDB: offsetDB, offsetChan: make(chan *Offset, c.Message.Offset.Buffer.Size), log: logger.WithField("broker", "mqtt"), } ··· return b, b.tomb.Gos(b.persistingMsgQos1, b.persistingOffset, b.cleaningMsgQos1) }
“···”表示这里省略了部分代码
可以看到这里为broker注入了刚刚加载好的cfg,并且创建了两个channel(现在只以QOS0作为研究对象),然后返回这个broker,这个broker就初始化好了。
Rules加载
假设一下淘宝进行个性化推荐时候的做法(只是为了理解这么比喻一下),可以生成每一个人的特征,看看这个人喜欢什么,比如喜欢鞋子、外衣、化妆品,那么淘宝后台专门为他/她开启一个线程(只是这么说,不用较真),这个线程就用来不断从淘宝中获取新的鞋子、外衣、化妆品的商品,然后把它推送到用户的手机中,那么这么多线程不方便管理,弄一个“线程池”集中管理。回到hub中,每一个session订阅的主题相当于特征,这时候为每一个session添加一个rulebase就是相当于淘宝后台专门开启的线程,当rulebase接收到数据后推送到客户端。
Rules初始化的入口如下:
1
2
3
4
5
6m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker) if err != nil { m.log.Errorln("failed to new rule manager:", err.Error()) return err }
进入rule.NewManager方法中,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// NewManager creates a new rule manager func NewManager(c []config.Subscription, b broker) (*Manager, error) { m := &Manager{ broker: b, rules: cmap.New(), trieq0: router.NewTrie(), log: logger.WithField("manager", "rule"), } m.rules.Set(common.RuleMsgQ0, newRuleQos0(m.broker, m.trieq0)) m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0)) for _, sub := range c { err := m.AddSinkSub(common.RuleTopic, sub.Target.Topic, uint32(sub.Source.QOS), sub.Source.Topic, uint32(sub.Target.QOS), sub.Target.Topic) if err != nil { return nil, fmt.Errorf("failed to add subscription (%v): %s", sub.Source, err.Error()) } } if b.Config().Status.Logging.Enable { return m, m.tomb.Gos(m.logging) } return m, nil }
这个部分是较为核心的部分,首先初始化了rule.Manager的部分属性,broker就使用上一步创建的broker,rules是一个map,trieq0表示的是一个Trie树,Trie树代表的就是消息发布、订阅时这种转发的关系,这也是为什么NewManager这个方法为什么要带着Subscription这个参数了,就是为了构建这个Trie树。
Trie树上每一个仅表示一个节点的转发关系,比如在/a/b节点中,近对主题为/a/b的消息进行处理,至于在subscriptino参数中定义的转发关系如何实现,是由下面要介绍的RuleTopic进行处理的
下面有两行是构建两个比较特殊的rule,第一个是RuleMsgQ0,这个rule主要是为了监听来自broker的消息,并把消息发送至对应的sinksub上,第二个是RuleTopic,这个rule主要是用来进行在subscriptions中定义的转发关系进行转发,转发到对应的层。
sinksub就是用来表示订阅关系,sinksub内包含有sessoin(session对应的rule)中的channel,sinksub将消息发送到channel中,session从channel另一端读取数据数据,并发送至客户端。
sinksub中有两个属性对比一下,一个是topic,另一个是targettopic:topic就是订阅的主题,targettopic就是要发送的主题
sinksub中的channel就是来自sink中的msgchan
RuleMsgQ0
进入到初始化RuleMsgQ0的方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43func newRuleQos0(b broker, r *router.Trie) *rulebase { return newRuleBase(common.RuleMsgQ0, false, b, r, nil, nil) } func newRuleBase(id string, persistent bool, b broker, r *router.Trie, publish, republish common.Publish) *rulebase { ··· rb := &rulebase{ id: id, broker: b, log: log, } ··· rb.msgchan = newMsgChan( b.Config().Message.Egress.Qos0.Buffer.Size, b.Config().Message.Egress.Qos1.Buffer.Size, publish, republish, b.Config().Message.Egress.Qos1.Retry.Interval, b.Config().Shutdown.Timeout, persist, log, ) rb.sink = newSink(id, b, r, rb.msgchan) return rb } func newSink(id string, b broker, r *router.Trie, msgchan *msgchan) *sink { s := &sink{ id: id, broker: b, trieq0: r, trieq1: router.NewTrie(), msgchan: msgchan, log: logger.WithField("sink", id), } return s }
msgchan可以理解为用于发送消息的channel
对照着参数,可以看到publish、republish传送过来均为nil,表明其实这个rule并不参与消息的转接(后面如果自己推敲一下,它同样会接收来自sinksub的消息,只不过因为没有在Trie树中注册sinksub,所以不会向这个channel发送数据罢了。
rule的publish方法就是用于在接收到来自sinksub从channel中发送的消息后进行的处理(普通的rulebase会发送回客户端,对于RuleTopic会向broker的channel转发消息)。
newRuleBase这个方法其实前面也没有什么,就是对rulebase的一个初始化,注意这里每一个rulebase都有一个自己的msgchan。这个方法的结尾有一个小点,生成一个newSink方法,里面对于这个sink赋予了来自rulebase的Trie树(其实这个Trie树应该是来自ruleManager的)、msgchan。
这样RuleMsgQ0就构造好了。
RuleTopic
1
2
3
4
5
6func newRuleTopic(b broker, r *router.Trie) *rulebase { rb := newRuleBase(common.RuleTopic, true, b, r, nil, nil) rb.msgchan.publish = rb.publish return rb }
与RuleMsgQ0非常相似,唯一不同的是这里为rulebase赋予了publish方法(这个publish最后给予了msgchan,这也是subscription属性定义的转发逻辑能够生效的关键),接下来就看看RuleTopic的publish方法:
1
2
3
4
5
6
7
8
9
10func (r *rulebase) publish(msg common.Message) { msg.QOS = msg.TargetQOS msg.Topic = msg.TargetTopic msg.SequenceID = 0 if msg.QOS == 1 { msg.SetCallbackPID(0, func(_ uint32) { msg.Ack() }) } r.broker.Flow(&msg) }
就是把要发送的TargetTopic和TargetQOS转换为Topic和QOS属性,因为后面RuleMsgQ0获取消息后对每一层级进行消息转发时是按照这个topic(其实是因为对Trie树进行遍历查找的,所以这个topic与Trie树每一层的children map的key值相同)。
举个例子,我们subscription选项里面source是“t”,target是“t/topic”,那么在之前构造Trie树就是如下这个图:
上图的key是指在Trie树中每一层children map中的key值
那么在客户端进行了对t/topic订阅操作后:
这样,如果对主题“t“发送一个消息后,消息从session发送到broker的channel中,RuleMsgQ0从broker的channel读取数据,然后从root出发根据主题”t”遍历Trie树,找到了第一个key为t的节点(sinksub,注意这个sinksub属于RuleTopic),通过这个sinksub对其channel(属于RuleTopic的channel)发送这个消息,RuleTopic拿到这个消息后,进入其publish函数,publish函数把消息的topic设置为原来的targetTopic,然后发送到broker的channel中,RuleMsgQ0又从broker中读取了这个新消息,然后根据Trie树遍历,先找到key值为“t”的节点,然后在这个节点的children map中找寻key值为“topic”的节点,最终找到了这个key为topic的sinksub(注意,这个sinksub属于用户session对应的Rule),然后把消息发送给这个sinksub的channel,之后会从这个channel读取数据并发送到用户客户端。
RuleManager是一个比较重要的部分,其实消息在hub中的流动主要在RuleManager的rules中流动,进而再发送到客户端,至于是如何流动的,后面会详细说明
sessions加载
sessions加载入口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// NewManager creates a session manager func NewManager(conf *config.Config, flow common.Flow, rules *rule.Manager, pf *persist.Factory) (*Manager, error) { ··· return &Manager{ auth: auth.NewAuth(conf.Principals), rules: rules, flow: flow, conf: &conf.Message, recorder: newRecorder(sessionDB), sessions: cmap.New(), log: logger.WithField("manager", "session"), }, nil }
主要是对Manager的各个属性进行初始化工作,这里看到有一个sessions属性,用的是map存储管理客户端会话,还存储了一些消息的配置信息,以及前面刚刚初始化的RuleManager,除了这些,着重解析一下auth和flow属性。
auth
auth从字面上很好理解,就是授权,我们进入到zhegeNewAuth方法中(这里只对普通的用户名密码授权进行解析):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// NewAuth creates auth func NewAuth(principals []config.Principal) *Auth { ··· _accounts := make(map[string]account) for _, principal := range principals { authorizer := NewAuthorizer() for _, p := range duplicatePubSubPermitRemove(principal.Permissions) { for _, topic := range p.Permits { authorizer.Add(topic, p.Action) } } ··· _accounts[principal.Username] = account{ Password: principal.Password, Authorizer: authorizer, } ··· } return &Auth{certs: _certs, accounts: _accounts} }
这个方法主要是遍历每一个principal,为其每一个生成authorizer,其中的duplicatePubSubPermitRemove方法是用于去除用户在pub、sub里面自定义中重复的主题名称,之后将这些sub、pub添加到authorizer中,之后把用户名、密码和authorizer信息存储到Auth中,并返回。
Flow
1
2
3
4
5
6
7
8
9
10
11// Flow flows message to broker func (b *Broker) Flow(msg *common.Message) { ··· select { case b.msgQ0Chan <- msg: case <-b.tomb.Dying(): b.log.Debugf("flow message (qos=0) failed since broker closed") } ··· }
这个方法就是用来向broker发送消息的,当session要进行发布消息时,通过这个方法把消息发送到broker的channel中,再由RuleManager中的rules进行消息的处理。
servers加载
servers的入口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// NewManager creates a server manager func NewManager(addrs []string, cert utils.Certificate, handle Handle) (*Manager, error) { launcher, err := mqtt.NewLauncher(cert) if err != nil { return nil, err } m := &Manager{ servers: make([]transport.Server, 0), handle: handle, log: logger.WithField("manager", "server"), } for _, addr := range addrs { svr, err := launcher.Launch(addr) if err != nil { m.Close() return nil, err } m.servers = append(m.servers, svr) } return m, nil }
因为hub不止可以监听一个地址(可以看一下yaml配置文件,里面的Listen是数组属性),这个ServerManager就是把所有的监听地址保存起来,并且,它还负责启动所有的Server(每一个server对应一个监听地址)。
hub启动
在main.go的start方法最后调用了两个方法,
1
2
3m.Rules.Start() m.servers.Start()
就是分别启动RuleManager和SeverManager,下面分别解析一下:
RuleManager启动
进入RuleManager的start方法:
1
2
3
4
5
6
7
8
9
10
11// Start starts all rules func (m *Manager) Start() { ··· for item := range m.rules.IterBuffered() { r := item.Val.(base) if err := r.start(); err != nil { m.log.WithError(err).Infof("failed to start rule (%s)", r.uid()) } } }
其实就是遍历所有的Rule,然后调用Rule的start方法:
1
2
3
4
5
6
7
8
9
10func (r *rulebase) start() (err error) { r.once.Do(func() { err = r.msgchan.start() ··· err = r.sink.start() ··· }) return }
这里面主要调用了两个方法一个是启动这个rule的msgchan,另一个是启动rule的sink。
msgchan启动
1
2
3
4
5func (c *msgchan) start() error { ··· return c.msgtomb.Gos(c.goProcessingQ0, c.goProcessingQ1) }
msgchan启动时运行goProcessingQ0方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func (c *msgchan) goProcessingQ0() error { ··· loop: for { select { case <-c.msgtomb.Dying(): break loop case msg := <-c.msgq0: c.process(msg) } } ··· } func (c *msgchan) process(msg *common.Message) { if msg.QOS == 0 { c.publish(*msg) return } ··· }
这个方法就是为了接收来自msgchan中channel的消息,然后在process方法中处理,想想之前RuleTopic中设置了一个publish函数,就是在process里面调用的。想想RuleTopic的逻辑,它在一个未知(就是sink.start的方法)地方接收到了消息,然后发送到自身的channel(RuleTopic对应的msgchan拥有的channel),之后在这个goProcessingQ0方法中接收到了这个消息,然后调用publish方法,把修改后的Message发送到broker的channel中。
sink启动
sink启动的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23func (s *sink) start() error { if s.id == common.RuleMsgQ0 { return s.tomb.Gos(s.goRoutingQ0) } ··· } //Blank: 就是为了数据进行路由,转发 func (s *sink) goRoutingQ0() error { ··· var msg *common.Message for { select { case <-s.tomb.Dying(): return nil case msg = <-s.broker.MsgQ0Chan(): matches := s.trieq0.MatchUnique(msg.Topic) for _, sub := range matches { sub.Flow(*msg) } } } }
这个start方法(只在QOS为0的层面)只对RuleMsgQ0有用,调用了goRoutingQ0方法。goRoutingQ0方法就是接收来自broker的channel中的方法(在上面提到的,RuleTopic把修改后的消息发送到broker后,消息就是在这里被消费的),里面调用了一个方法,MatchUnique,这个方法就是为了从Trie树中拿到对应主题的subsink,然后调用subsink的Flow方法。
这里先顺一下思路,其实这个goRoutingQ0只是为了从broker中拿到消息,然后根据消息的topic匹配到subsink,通过subsink将消息发送到session中,这也就是为什么发布的消息(QOS=0)能够发送到相对应的客户端中(订阅了该主题,或者在subscription中定义的内容)。因为这个方法其实只对RuleMsgQ0有用,所以这也是hub中预先定义这个Rule的原因了。其实如果不需要subscription定义的关系的话,那么不启动RuleTopic也是可以的。把RuleManager中NewManager的
m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0))
及后面对Trie树存放sinksub的循环删掉后,就将subscription定义的内容无效掉了。
接下来看一下subsink的Flow方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// Flow flows message func (s *sinksub) Flow(msg common.Message) { // set target topic if s.ttopic != "" { msg.TargetTopic = s.ttopic } else { msg.TargetTopic = msg.Topic } ··· if sqos == 0 { msg.TargetQOS = 0 s.channel.putQ0(&msg) } else { msg.TargetQOS = s.tqos s.channel.putQ1(&msg) } }
第一个if-else就是为了对Message设置TargetTopic,这一点主要是为RuleTopic用的,因为RuleTopic接收到消息后,要把消息的topic属性设置为TargetTopic对应的值,所以在这里提前把Message的TargetTopic设置为sinksub对应的TargetTopic(因为从客户端发送过来的消息是没有TargetTopic的,这里要设置一下。sinksub本身就是作为转发(从一个主题到另一个主题)的关系表示,如果这个sinksub表示的是RuleTopic创建的话,本身就代表这个消息(这个主题的消息)就要发送到RuleTopic定义的主题中(TargetTopic),所以这里要把TargetTopic设置为与sinksub一样,这样消息在之后才能按照subscriptions定义的进行转发。
在第二个if-else中,使用sinksub对应的channel发送这个消息,sinksub对应的channel是sink中的msgchan(再往深了看,其实是Rule的msgchan),这样把消息发送后,在刚刚msgchan启动中提到的goProcessingQ0方法中一直阻塞在等待消息的地方case msg := <-c.msgq0:
就获取到了消息,然后把消息使用process方法进行处理:
- 如果Rule是RuleTopic的话,会调用publish方法,把消息的Topic改为TargetTopic对应的值,然后发送到broker的channel中,之后RuleMsgQ0从这个channel获取到了数据,再重复上面找寻sinksub的过程(注意再次寻找后获得的sinksub不一定为session对应的Rule所创建的,有可能依然还是RuleTopic对应的sinksub,因为可能出现如下情况,这样就需要经过两次的RuleTopic的修改转发)
1
2
3
4
5
6
7
8
9
10subscriptions: - source: topic: 't' target: topic: 't/topic' - source: topic: 't/topic' target: topic: 't/topic/a'
- 如果Rule是普通Session中创建的话,那么就会调用客户端的处理方法(其实就是发送方法,具体这个方法是如何设置在sink中以后会写文章提到,这里简单说一下,这个方法位于openedge/openedge-hub/session/session_egress.go#publish),这个publish方法把这个消息发送到对应的客户端中。
ServerManager启动
ServerManager启动方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14// Start starts all servers func (m *Manager) Start() { for _, item := range m.servers { svr := item m.tomb.Go(func() error { for { conn, err := svr.Accept() ··· go m.handle(conn) } }) } }
这个方法主要是对每一个监听Server(之前在ServerManager中定义的Server)监听连接请求,收到新的请求后,调用handle方法进行处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// Handle handles connection func (m *Manager) Handle(conn transport.Conn) { defer conn.Close() conn.SetReadLimit(int64(m.conf.Length.Max)) newSession(conn, m).Handle() } func newSession(conn transport.Conn, manager *Manager) *session { return &session{ conn: conn, manager: manager, subs: make(map[string]packet.Subscription), pids: common.NewPacketIDS(), log: logger.WithField("mqtt", "session"), permittedPublishTopics: make(map[string]struct{}), } }
上面两个方法主要是创建一个新的会话session,至于session是如何初始化的,是在Session的handle方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// Handle handles mqtt connection func (s *session) Handle() { var err error var pkt packet.Generic for { pkt, err = s.conn.Receive() ··· switch p := pkt.(type) { case *packet.Connect: ··· case *packet.Publish: ··· case *packet.Puback: ··· case *packet.Subscribe: ··· case *packet.Pingreq: ··· case *packet.Pingresp: ··· case *packet.Disconnect: ··· return case *packet.Unsubscribe: ··· default: ··· } ··· } }
也就是Session监听收到的packet,然后依据packet的种类,采取不同的处理逻辑,之前提到的Rule的创建、Authorize就是在Connect这里面处理的,这个在下一篇文章中进行讲解。
最后
以上就是俊逸舞蹈最近收集整理的关于openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)的全部内容,更多相关openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)内容请搜索靠谱客的其他文章。
发表评论 取消回复