我是靠谱客的博主 俊逸舞蹈,最近开发中收集的这篇文章主要介绍openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块的通信中心节点(消息的接收和转发)是非常重要的,本篇主要介绍一下openedge-hub模块的启动以及在QOS=0的情况下消息的发送和转发,本文主要是为了记录下思路方便后续改造,因个人水平有限,对于MQTT了解有限,很多名词使用不当,中间若有错误劳烦告知,谢谢!

openedge-hub的启动

openedge-hub的开始的节点是在openedge/openedge-hub/main.go文件进行的,main函数如下所示:

	openedge.Run(func(ctx openedge.Context) error {
m := mo{log: ctx.Log()}
defer m.close()
err := m.start()
if err != nil {
return err
}
ctx.Wait()
return nil
})

其作用主要是启动mo对象,mo结构如下:

type mo struct {
cfg
config.Config
Rules
*rule.Manager
Sessions *session.Manager
broker
*broker.Broker
servers
*server.Manager
factory
*persist.Factory
log
logger.Logger
}
  1. cfg是读取配置文件后保存配置的实体。
  2. Rules是消息转发使得“路由器”。
  3. Sessions保存着所有客户端的连接。
  4. broker中间带有channel,用于接收来自session发送的消息,并通过这个channel发送至路由器(这里说的有点欠妥,后面会讲到的),路由器找到对应的session,并将消息发送到session对应的客户端。
  5. factory是用于进行持久化的,暂不做解析。
  6. log进行日志的记录,暂不做解析。

刚刚main文件中有一行代码是:err := m.start(),这个就是启动的入口,接下来看一下这个start方法:

func (m *mo) start() error {
err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
if err != nil {
m.log.Errorln("failed to load config:", err.Error())
return err
}
m.factory, err = persist.NewFactory(m.cfg.Storage.Dir)
if err != nil {
m.log.Errorln("failed to new factory:", err.Error())
return err
}
m.broker, err = broker.NewBroker(&m.cfg, m.factory)
if err != nil {
m.log.Errorln("failed to new broker:", err.Error())
return err
}
m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
if err != nil {
m.log.Errorln("failed to new rule manager:", err.Error())
return err
}
m.Sessions, err = session.NewManager(&m.cfg, m.broker.Flow, m.Rules, m.factory)
if err != nil {
m.log.Errorln("failed to new session manager:", err.Error())
return err
}
m.servers, err = server.NewManager(m.cfg.Listen, m.cfg.Certificate, m.Sessions.Handle)
if err != nil {
m.log.Errorln("failed to new server manager:", err.Error())
return err
}
m.Rules.Start()
m.servers.Start()
return nil
}

总体上就是分别对mo的各个属性进行初始化,下面分别对每一个属性的初始化进行解析。

cfg加载

cfg加载在main.go中start方法的代码如下:

err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
if err != nil {
m.log.Errorln("failed to load config:", err.Error())
return err
}

其实就是把yaml对应的属性填充到mo的cfg中,下面介绍一下cfg这个结构体的一些配置项:

// Config all config of edge
type Config struct {
Listen
[]string
`yaml:"listen" json:"listen"`
Certificate utils.Certificate `yaml:"certificate" json:"certificate"`
Principals
[]Principal
`yaml:"principals" json:"principals" validate:"principals"`
Subscriptions []Subscription `yaml:"subscriptions" json:"subscriptions" validate:"subscriptions"`
Message Message `yaml:"message" json:"message"`
Status
struct {
Logging struct {
Enable
bool
`yaml:"enable" json:"enable"`
Interval time.Duration `yaml:"interval" json:"interval" default:"1m"`
} `yaml:"logging" json:"logging"`
} `yaml:"status" json:"status"`
Storage struct {
Dir string `yaml:"dir" json:"dir" default:"var/db/openedge"`
} `yaml:"storage" json:"storage"`
Shutdown struct {
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10m"`
} `yaml:"shutdown" json:"shutdown"`
}

附上一个源码中示例的文件:

name: localhub
listen:
- tcp://0.0.0.0:1883
principals:
- username: 'test'
password: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08'
permissions:
- action: 'pub'
permit: ['#']
- action: 'sub'
permit: ['#']
subscriptions:
- source:
topic: 't'
target:
topic: 't/topic'
logger:
path: var/log/openedge/localhub/localhub.log
console: true
level: "debug"
  • name是模块的名称,localhub的模块起名为localhub。
  • listen是用于开启服务器后监听的地址(客户端连接的地址)。
  • principals存放客户端连接的用户名、密码(密码是使用SHA-256加密的,客户端连接的时候使用解密后的明文密码)、权限等相关信息。
  • subscriptions用于存放消息转发(订阅)的路由规则,source是发布消息时的地址,target是转发的地址(比如A发送的“t”主题消息就会转发到订阅“t/topic”主题的客户端中。
  • logger是配置日志相关信息。

通过加载配置文件后,cfg中就存储了这些信息。

broker加载

使用淘宝的人既可以自己开店,也可以去买货,但是卖货的信息发布在什么地方呢?想要买东西从哪里浏览呢?当然是通过淘宝了,broker就是淘宝,所有连接到openedge-hub(后面简称为hub)的session既可以作为卖家发布消息,这个消息就发送到broker的channel(根据QOS不同对应了不同的channel)了,在生活中为了让买家能够买到称心如意的东西,淘宝一般会进行个性化推荐,在hub中每一个session会个性化的订阅主题,这时候路由器(Rule.Manager)就会把符合session需要的消息发送到session中了。

broker初始化的入口如下:

	m.broker, err = broker.NewBroker(&m.cfg, m.factory)
if err != nil {
m.log.Errorln("failed to new broker:", err.Error())
return err
}

接下来进入到broker.NewBroker这个方法中:

// NewBroker NewBroker
func NewBroker(c *config.Config, pf *persist.Factory) (b *Broker, err error) {
···
b = &Broker{
config:
c,
msgQ0Chan:
make(chan *common.Message, c.Message.Ingress.Qos0.Buffer.Size),
msgQ1Chan:
make(chan *common.Message, c.Message.Ingress.Qos1.Buffer.Size),
msgQ1DB:
msgqos1DB,
offsetDB:
offsetDB,
offsetChan: make(chan *Offset, c.Message.Offset.Buffer.Size),
log:
logger.WithField("broker", "mqtt"),
}
···
return b, b.tomb.Gos(b.persistingMsgQos1, b.persistingOffset, b.cleaningMsgQos1)
}

“···”表示这里省略了部分代码

可以看到这里为broker注入了刚刚加载好的cfg,并且创建了两个channel(现在只以QOS0作为研究对象),然后返回这个broker,这个broker就初始化好了。

Rules加载

假设一下淘宝进行个性化推荐时候的做法(只是为了理解这么比喻一下),可以生成每一个人的特征,看看这个人喜欢什么,比如喜欢鞋子、外衣、化妆品,那么淘宝后台专门为他/她开启一个线程(只是这么说,不用较真),这个线程就用来不断从淘宝中获取新的鞋子、外衣、化妆品的商品,然后把它推送到用户的手机中,那么这么多线程不方便管理,弄一个“线程池”集中管理。回到hub中,每一个session订阅的主题相当于特征,这时候为每一个session添加一个rulebase就是相当于淘宝后台专门开启的线程,当rulebase接收到数据后推送到客户端。

Rules初始化的入口如下:

	m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
if err != nil {
m.log.Errorln("failed to new rule manager:", err.Error())
return err
}

进入rule.NewManager方法中,如下:

// NewManager creates a new rule manager
func NewManager(c []config.Subscription, b broker) (*Manager, error) {
m := &Manager{
broker: b,
rules:
cmap.New(),
trieq0: router.NewTrie(),
log:
logger.WithField("manager", "rule"),
}
m.rules.Set(common.RuleMsgQ0, newRuleQos0(m.broker, m.trieq0))
m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0))
for _, sub := range c {
err := m.AddSinkSub(common.RuleTopic, sub.Target.Topic, uint32(sub.Source.QOS), sub.Source.Topic, uint32(sub.Target.QOS), sub.Target.Topic)
if err != nil {
return nil, fmt.Errorf("failed to add subscription (%v): %s", sub.Source, err.Error())
}
}
if b.Config().Status.Logging.Enable {
return m, m.tomb.Gos(m.logging)
}
return m, nil
}

这个部分是较为核心的部分,首先初始化了rule.Manager的部分属性,broker就使用上一步创建的broker,rules是一个map,trieq0表示的是一个Trie树,Trie树代表的就是消息发布、订阅时这种转发的关系,这也是为什么NewManager这个方法为什么要带着Subscription这个参数了,就是为了构建这个Trie树。

Trie树上每一个仅表示一个节点的转发关系,比如在/a/b节点中,近对主题为/a/b的消息进行处理,至于在subscriptino参数中定义的转发关系如何实现,是由下面要介绍的RuleTopic进行处理的

下面有两行是构建两个比较特殊的rule,第一个是RuleMsgQ0,这个rule主要是为了监听来自broker的消息,并把消息发送至对应的sinksub上,第二个是RuleTopic,这个rule主要是用来进行在subscriptions中定义的转发关系进行转发,转发到对应的层。

sinksub就是用来表示订阅关系,sinksub内包含有sessoin(session对应的rule)中的channel,sinksub将消息发送到channel中,session从channel另一端读取数据数据,并发送至客户端。
sinksub中有两个属性对比一下,一个是topic,另一个是targettopic:topic就是订阅的主题,targettopic就是要发送的主题
sinksub中的channel就是来自sink中的msgchan

RuleMsgQ0

进入到初始化RuleMsgQ0的方法中:

func newRuleQos0(b broker, r *router.Trie) *rulebase {
return newRuleBase(common.RuleMsgQ0, false, b, r, nil, nil)
}
func newRuleBase(id string, persistent bool, b broker, r *router.Trie, publish, republish common.Publish) *rulebase {
···
rb := &rulebase{
id:
id,
broker: b,
log:
log,
}
···
rb.msgchan = newMsgChan(
b.Config().Message.Egress.Qos0.Buffer.Size,
b.Config().Message.Egress.Qos1.Buffer.Size,
publish,
republish,
b.Config().Message.Egress.Qos1.Retry.Interval,
b.Config().Shutdown.Timeout,
persist,
log,
)
rb.sink = newSink(id, b, r, rb.msgchan)
return rb
}
func newSink(id string, b broker, r *router.Trie, msgchan *msgchan) *sink {
s := &sink{
id:
id,
broker:
b,
trieq0:
r,
trieq1:
router.NewTrie(),
msgchan: msgchan,
log:
logger.WithField("sink", id),
}
return s
}

msgchan可以理解为用于发送消息的channel

对照着参数,可以看到publish、republish传送过来均为nil,表明其实这个rule并不参与消息的转接(后面如果自己推敲一下,它同样会接收来自sinksub的消息,只不过因为没有在Trie树中注册sinksub,所以不会向这个channel发送数据罢了。

rule的publish方法就是用于在接收到来自sinksub从channel中发送的消息后进行的处理(普通的rulebase会发送回客户端,对于RuleTopic会向broker的channel转发消息)。

newRuleBase这个方法其实前面也没有什么,就是对rulebase的一个初始化,注意这里每一个rulebase都有一个自己的msgchan。这个方法的结尾有一个小点,生成一个newSink方法,里面对于这个sink赋予了来自rulebase的Trie树(其实这个Trie树应该是来自ruleManager的)、msgchan。
这样RuleMsgQ0就构造好了。

RuleTopic

func newRuleTopic(b broker, r *router.Trie) *rulebase {
rb := newRuleBase(common.RuleTopic, true, b, r, nil, nil)
rb.msgchan.publish = rb.publish
return rb
}

与RuleMsgQ0非常相似,唯一不同的是这里为rulebase赋予了publish方法(这个publish最后给予了msgchan,这也是subscription属性定义的转发逻辑能够生效的关键),接下来就看看RuleTopic的publish方法:

func (r *rulebase) publish(msg common.Message) {
msg.QOS = msg.TargetQOS
msg.Topic = msg.TargetTopic
msg.SequenceID = 0
if msg.QOS == 1 {
msg.SetCallbackPID(0, func(_ uint32) { msg.Ack() })
}
r.broker.Flow(&msg)
}

就是把要发送的TargetTopic和TargetQOS转换为Topic和QOS属性,因为后面RuleMsgQ0获取消息后对每一层级进行消息转发时是按照这个topic(其实是因为对Trie树进行遍历查找的,所以这个topic与Trie树每一层的children map的key值相同)。
举个例子,我们subscription选项里面source是“t”,target是“t/topic”,那么在之前构造Trie树就是如下这个图:

root
key : t topic : t targetTopic : t/topic

上图的key是指在Trie树中每一层children map中的key值

那么在客户端进行了对t/topic订阅操作后:

root
key : t topic : t targetTopic : t/topic
key : topic topic : topic

这样,如果对主题“t“发送一个消息后,消息从session发送到broker的channel中,RuleMsgQ0从broker的channel读取数据,然后从root出发根据主题”t”遍历Trie树,找到了第一个key为t的节点(sinksub,注意这个sinksub属于RuleTopic),通过这个sinksub对其channel(属于RuleTopic的channel)发送这个消息,RuleTopic拿到这个消息后,进入其publish函数,publish函数把消息的topic设置为原来的targetTopic,然后发送到broker的channel中,RuleMsgQ0又从broker中读取了这个新消息,然后根据Trie树遍历,先找到key值为“t”的节点,然后在这个节点的children map中找寻key值为“topic”的节点,最终找到了这个key为topic的sinksub(注意,这个sinksub属于用户session对应的Rule),然后把消息发送给这个sinksub的channel,之后会从这个channel读取数据并发送到用户客户端。

RuleManager是一个比较重要的部分,其实消息在hub中的流动主要在RuleManager的rules中流动,进而再发送到客户端,至于是如何流动的,后面会详细说明

sessions加载

sessions加载入口如下:

// NewManager creates a session manager
func NewManager(conf *config.Config, flow common.Flow, rules *rule.Manager, pf *persist.Factory) (*Manager, error) {
···
return &Manager{
auth:
auth.NewAuth(conf.Principals),
rules:
rules,
flow:
flow,
conf:
&conf.Message,
recorder: newRecorder(sessionDB),
sessions: cmap.New(),
log:
logger.WithField("manager", "session"),
}, nil
}

主要是对Manager的各个属性进行初始化工作,这里看到有一个sessions属性,用的是map存储管理客户端会话,还存储了一些消息的配置信息,以及前面刚刚初始化的RuleManager,除了这些,着重解析一下auth和flow属性。

auth

auth从字面上很好理解,就是授权,我们进入到zhegeNewAuth方法中(这里只对普通的用户名密码授权进行解析):

// NewAuth creates auth
func NewAuth(principals []config.Principal) *Auth {
···
_accounts := make(map[string]account)
for _, principal := range principals {
authorizer := NewAuthorizer()
for _, p := range duplicatePubSubPermitRemove(principal.Permissions) {
for _, topic := range p.Permits {
authorizer.Add(topic, p.Action)
}
}
···
_accounts[principal.Username] = account{
Password:
principal.Password,
Authorizer: authorizer,
}
···
}
return &Auth{certs: _certs, accounts: _accounts}
}

这个方法主要是遍历每一个principal,为其每一个生成authorizer,其中的duplicatePubSubPermitRemove方法是用于去除用户在pub、sub里面自定义中重复的主题名称,之后将这些sub、pub添加到authorizer中,之后把用户名、密码和authorizer信息存储到Auth中,并返回。

Flow

// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {
···
select {
case b.msgQ0Chan <- msg:
case <-b.tomb.Dying():
b.log.Debugf("flow message (qos=0) failed since broker closed")
}
···
}

这个方法就是用来向broker发送消息的,当session要进行发布消息时,通过这个方法把消息发送到broker的channel中,再由RuleManager中的rules进行消息的处理。

servers加载

servers的入口如下:

// NewManager creates a server manager
func NewManager(addrs []string, cert utils.Certificate, handle Handle) (*Manager, error) {
launcher, err := mqtt.NewLauncher(cert)
if err != nil {
return nil, err
}
m := &Manager{
servers: make([]transport.Server, 0),
handle:
handle,
log:
logger.WithField("manager", "server"),
}
for _, addr := range addrs {
svr, err := launcher.Launch(addr)
if err != nil {
m.Close()
return nil, err
}
m.servers = append(m.servers, svr)
}
return m, nil
}

因为hub不止可以监听一个地址(可以看一下yaml配置文件,里面的Listen是数组属性),这个ServerManager就是把所有的监听地址保存起来,并且,它还负责启动所有的Server(每一个server对应一个监听地址)。

hub启动

在main.go的start方法最后调用了两个方法,

m.Rules.Start()
m.servers.Start()

就是分别启动RuleManager和SeverManager,下面分别解析一下:

RuleManager启动

进入RuleManager的start方法:

// Start starts all rules
func (m *Manager) Start() {
···
for item := range m.rules.IterBuffered() {
r := item.Val.(base)
if err := r.start(); err != nil {
m.log.WithError(err).Infof("failed to start rule (%s)", r.uid())
}
}
}

其实就是遍历所有的Rule,然后调用Rule的start方法:

func (r *rulebase) start() (err error) {
r.once.Do(func() {
err = r.msgchan.start()
···
err = r.sink.start()
···
})
return
}

这里面主要调用了两个方法一个是启动这个rule的msgchan,另一个是启动rule的sink。

msgchan启动

func (c *msgchan) start() error {
···
return c.msgtomb.Gos(c.goProcessingQ0, c.goProcessingQ1)
}

msgchan启动时运行goProcessingQ0方法:

func (c *msgchan) goProcessingQ0() error {
···
loop:
for {
select {
case <-c.msgtomb.Dying():
break loop
case msg := <-c.msgq0:
c.process(msg)
}
}
···
}
func (c *msgchan) process(msg *common.Message) {
if msg.QOS == 0 {
c.publish(*msg)
return
}
···
}

这个方法就是为了接收来自msgchan中channel的消息,然后在process方法中处理,想想之前RuleTopic中设置了一个publish函数,就是在process里面调用的。想想RuleTopic的逻辑,它在一个未知(就是sink.start的方法)地方接收到了消息,然后发送到自身的channel(RuleTopic对应的msgchan拥有的channel),之后在这个goProcessingQ0方法中接收到了这个消息,然后调用publish方法,把修改后的Message发送到broker的channel中。

sink启动

sink启动的代码如下:

func (s *sink) start() error {
if s.id == common.RuleMsgQ0 {
return s.tomb.Gos(s.goRoutingQ0)
}
···
}
//Blank: 就是为了数据进行路由,转发
func (s *sink) goRoutingQ0() error {
···
var msg *common.Message
for {
select {
case <-s.tomb.Dying():
return nil
case msg = <-s.broker.MsgQ0Chan():
matches := s.trieq0.MatchUnique(msg.Topic)
for _, sub := range matches {
sub.Flow(*msg)
}
}
}
}

这个start方法(只在QOS为0的层面)只对RuleMsgQ0有用,调用了goRoutingQ0方法。goRoutingQ0方法就是接收来自broker的channel中的方法(在上面提到的,RuleTopic把修改后的消息发送到broker后,消息就是在这里被消费的),里面调用了一个方法,MatchUnique,这个方法就是为了从Trie树中拿到对应主题的subsink,然后调用subsink的Flow方法。

这里先顺一下思路,其实这个goRoutingQ0只是为了从broker中拿到消息,然后根据消息的topic匹配到subsink,通过subsink将消息发送到session中,这也就是为什么发布的消息(QOS=0)能够发送到相对应的客户端中(订阅了该主题,或者在subscription中定义的内容)。因为这个方法其实只对RuleMsgQ0有用,所以这也是hub中预先定义这个Rule的原因了。其实如果不需要subscription定义的关系的话,那么不启动RuleTopic也是可以的。把RuleManager中NewManager的m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0)) 及后面对Trie树存放sinksub的循环删掉后,就将subscription定义的内容无效掉了。

接下来看一下subsink的Flow方法:

// Flow flows message
func (s *sinksub) Flow(msg common.Message) {
// set target topic
if s.ttopic != "" {
msg.TargetTopic = s.ttopic
} else {
msg.TargetTopic = msg.Topic
}
···
if sqos == 0 {
msg.TargetQOS = 0
s.channel.putQ0(&msg)
} else {
msg.TargetQOS = s.tqos
s.channel.putQ1(&msg)
}
}

第一个if-else就是为了对Message设置TargetTopic,这一点主要是为RuleTopic用的,因为RuleTopic接收到消息后,要把消息的topic属性设置为TargetTopic对应的值,所以在这里提前把Message的TargetTopic设置为sinksub对应的TargetTopic(因为从客户端发送过来的消息是没有TargetTopic的,这里要设置一下。sinksub本身就是作为转发(从一个主题到另一个主题)的关系表示,如果这个sinksub表示的是RuleTopic创建的话,本身就代表这个消息(这个主题的消息)就要发送到RuleTopic定义的主题中(TargetTopic),所以这里要把TargetTopic设置为与sinksub一样,这样消息在之后才能按照subscriptions定义的进行转发
在第二个if-else中,使用sinksub对应的channel发送这个消息,sinksub对应的channel是sink中的msgchan(再往深了看,其实是Rule的msgchan),这样把消息发送后,在刚刚msgchan启动中提到的goProcessingQ0方法中一直阻塞在等待消息的地方case msg := <-c.msgq0:就获取到了消息,然后把消息使用process方法进行处理:

  • 如果Rule是RuleTopic的话,会调用publish方法,把消息的Topic改为TargetTopic对应的值,然后发送到broker的channel中,之后RuleMsgQ0从这个channel获取到了数据,再重复上面找寻sinksub的过程(注意再次寻找后获得的sinksub不一定为session对应的Rule所创建的,有可能依然还是RuleTopic对应的sinksub,因为可能出现如下情况,这样就需要经过两次的RuleTopic的修改转发)
subscriptions:
- source:
topic: 't'
target:
topic: 't/topic'
- source:
topic: 't/topic'
target:
topic: 't/topic/a'
  • 如果Rule是普通Session中创建的话,那么就会调用客户端的处理方法(其实就是发送方法,具体这个方法是如何设置在sink中以后会写文章提到,这里简单说一下,这个方法位于openedge/openedge-hub/session/session_egress.go#publish),这个publish方法把这个消息发送到对应的客户端中。

ServerManager启动

ServerManager启动方法如下:

// Start starts all servers
func (m *Manager) Start() {
for _, item := range m.servers {
svr := item
m.tomb.Go(func() error {
for {
conn, err := svr.Accept()
···
go m.handle(conn)
}
})
}
}

这个方法主要是对每一个监听Server(之前在ServerManager中定义的Server)监听连接请求,收到新的请求后,调用handle方法进行处理:

// Handle handles connection
func (m *Manager) Handle(conn transport.Conn) {
defer conn.Close()
conn.SetReadLimit(int64(m.conf.Length.Max))
newSession(conn, m).Handle()
}
func newSession(conn transport.Conn, manager *Manager) *session {
return &session{
conn:
conn,
manager:
manager,
subs:
make(map[string]packet.Subscription),
pids:
common.NewPacketIDS(),
log:
logger.WithField("mqtt", "session"),
permittedPublishTopics: make(map[string]struct{}),
}
}

上面两个方法主要是创建一个新的会话session,至于session是如何初始化的,是在Session的handle方法中:

// Handle handles mqtt connection
func (s *session) Handle() {
var err error
var pkt packet.Generic
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Connect:
···
case *packet.Publish:
···
case *packet.Puback:
···
case *packet.Subscribe:
···
case *packet.Pingreq:
···
case *packet.Pingresp:
···
case *packet.Disconnect:
···
return
case *packet.Unsubscribe:
···
default:
···
}
···
}
}

也就是Session监听收到的packet,然后依据packet的种类,采取不同的处理逻辑,之前提到的Rule的创建、Authorize就是在Connect这里面处理的,这个在下一篇文章中进行讲解。

最后

以上就是俊逸舞蹈为你收集整理的openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)的全部内容,希望文章能够帮你解决openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部