我是靠谱客的博主 兴奋芝麻,这篇文章主要介绍openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)前言openedge-function启动,现在分享给大家,希望可以做个参考。

前言

中断了一段时间,发现前面分析的hub模块的源码拉错分枝了(对,我就是个菜鸡),不过大致流程差不多,有时间改一下。这次分析openedge-function模块,openedge-function模块比较简单,很多核心功能其实还是依赖于master和hub的。

openedge-function启动

下面将openedge-function模块简称为function模块

入口函数是openedge-0.1.1/openedge-function/main.go,其核心代码如下:

复制代码
1
2
3
4
5
6
7
8
func main(){ ··· m, err := New(f.Config) ··· err = m.Start() ··· }

也就是分为两个部分,一个是New一个mo(也就是上面的m),之后启动这个mo。

New mo

核心代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// New creates a new module func New(confDate string) (module.Module, error) { var cfg Config err := module.Load(&cfg, confDate) ··· defaults(&cfg) ··· man, err := NewManager(cfg) ··· mo{ cfg: cfg, man: man, rrs: []*ruler{}, log: logger.WithFields(), } for _, r := range cfg.Rules { f, err := man.Get(r.Compute.Function) if err != nil { m.Close() return nil, err } rr, err := create(r, cfg.Hub, f) ··· m.rrs = append(m.rrs, rr) } return m, nil }

前面几行就是为了加载配置,这里面提一下default函数:

复制代码
1
2
3
4
5
6
7
8
func defaults(cfg *Config) { if cfg.API.Address == "" { cfg.API.Address = utils.GetEnv(module.EnvOpenEdgeMasterAPI) } cfg.API.Username = cfg.UniqueName() cfg.API.Password = utils.GetEnv(module.EnvOpenEdgeModuleToken) }

这里满就是为了设置master API的地址,这个就是master模块中创建API的缘故,也是启动、关闭函数模块的关键。
配置设置好后,开始创建Manager:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// NewManager loads all functions and return func NewManager(c Config) (*Manager, error) { cli, err := master.NewClient(c.API) ··· m := &Manager{ cfg: c, cli: cli, fcs: make(map[string]*Function), log: logger.WithFields("manager", "function"), } for _, fc := range c.Functions { m.fcs[fc.Name] = newFunction(m, fc) } return m, nil }

cli是为了与master节点(API)通信的,fcs中存储了所有我们定义的函数,其核心就在于newFunction方法:

复制代码
1
2
3
4
5
6
func newFunction(m *Manager, c config.Function) *Function { ··· f.pool = pool.NewObjectPool(context.Background(), newFuncionFactory(f), pc) return f }

这里面创建一个对象池,这个对象池里面用于对我们刚刚定义的Function进行管理。
Manager创建过后,回到New方法,接下来用于创建mo,其中最重要的是如下部分:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
// New creates a new module func New(confDate string) (module.Module, error) { ··· for _, r := range cfg.Rules { ··· rr, err := create(r, cfg.Hub, f) ··· m.rrs = append(m.rrs, rr) } return m, nil }

循环里面的f就是我们配置文件定义的函数,然后使用create方法创建ruler,并添加至mo的rrs中(rrs就是用来存放ruler的)。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
func create(r Rule, cc config.MQTTClient, f *Function) (*ruler, error) { ··· cc.Subscriptions = []config.Subscription{config.Subscription{Topic: r.Subscribe.Topic, QOS: r.Subscribe.QOS}} fd, err := NewDispatcher(f) ··· return &ruler{ r: &r, fd: fd, md: mqtt.NewDispatcher(cc), }, nil }

create方法创建了ruler,其中有两个属性最为重要,一个是fd,另一个是md,fd根据用途我猜应该是Function Dispatcher,md是MQTT Dispatcher,前者主要用于通过使用master API启动一个函数运算模块(后面会提到),后者主要进行MQTT消息转发处理,里面存放了每个函数订阅的主题信息,两者是互相配合工作的(后面会提到)。

mo start

核心代码如下:

复制代码
1
2
3
4
5
6
7
8
9
// Start starts module func (m *mo) Start() error { for _, rr := range m.rrs { err := rr.start() ··· } return nil }

其核心就是启动ruler:

复制代码
1
2
3
4
5
6
7
8
9
10
func (rr *ruler) start() error { ··· h := mqtt.Handler{} h.ProcessPublish = func(p *packet.Publish) error { return rr.fd.Invoke(p) } ··· return rr.md.Start(h) }

这个方法先是设置了MQTT消息类型为publish的方法,也就是调用Function Dispatcher的invoke方法(这个方法会讲到),然后启动MQTT,MQTT启动方法如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Start starts dispatcher func (d *Dispatcher) Start(h Handler) error { return d.tomb.Go(func() error { return d.supervisor(h) }) } // Supervisor the supervised reconnect loop func (d *Dispatcher) supervisor(handler Handler) error { ··· client, err := NewClient(d.config, handler) ··· // run dispatcher on client current, dying = d.dispatcher(client, current) ··· } // NewClient returns a new client func NewClient(cc config.MQTTClient, handler Handler) (*Client, error) { ··· c := &Client{ ··· } err = c.connect() ··· return c, nil }

首先调用supervisor方法循环启动(启动失败了再重启),然后调用NewClient方法创建一个新的MQTT连接,这里面c.connect()中涉及了一个connect方法,这个后面会用到:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Client) connect() (err error) { // allocate packet connect := packet.NewConnect() ··· // send connect packet err = c.send(connect, false) ··· // start process routine c.tomb.Go(c.processor) ··· // allocate subscribe packet subscribe := packet.NewSubscribe() subscribe.ID = 1 subscribe.Subscriptions = c.config.GetSubscriptions() ··· // send packet err = c.send(subscribe, true) if err != nil { return c.die(err) } return nil } // processes incoming packets func (c *Client) processor() error { ··· for { // get next packet from connection pkt, err := c.conn.Receive() ··· switch p := pkt.(type) { case *packet.Publish: err = c.handler.ProcessPublish(p) case *packet.Puback: err = c.handler.ProcessPuback(p) ··· } }

首先创建一个真正的MQTT连接——connect(真正是表示这个是由其他库完成的),然后发送连接包,并注册相关MQTT包的处理方法(processor方法),然后发送进行相关主题的订阅。这里面processor方法里面,处理Publish(ProcessPublish)、Puback(ProcessPuback)方法不就是在ruler的start方法中定义的ProcessPublish吗!

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
func (rr *ruler) start() error { ··· h := mqtt.Handler{} h.ProcessPublish = func(p *packet.Publish) error { //TODO //Blank:开启方法的开端 return rr.fd.Invoke(p) } ··· return rr.md.Start(h) }

重新回到supervisor方法,里面剩下一个dispatch方法没有解释:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// reads from the queues and calls the current client func (d *Dispatcher) dispatcher(client *Client, current packet.Generic) (packet.Generic, bool) { ··· if current != nil { err := client.Send(current) ··· } for { select { case pkt := <-d.channel: err := client.Send(pkt) if err != nil { return pkt, false } case <-client.Dying(): return nil, false case <-d.tomb.Dying(): return nil, true } } }

这个方法其实就是读取dispatcher中的消息,然后进行发送至hub模块,也就是真正进行与hub模块的通信。
这样,function模块的启动就介绍完了,主要是为每一个函数封装为一个ruler,ruler中有一个Funtion Dispatcher和一个MQTT Dispatcher,后者用于订阅、发布消息,当MQTT Dispatcher收到消息后由Function Dispatcher找到对应的function,然后向master API发送创建一个函数模块的请求,由master API负责启动模块
另外master启动函数模块后,函数模块会启动一个小的grpc服务器,由function通过rpc调用函数模块的Handle方法,Handle方法会打开我们定义的python文件,然后运行,并将运行结果返回给function模块,function模块获得返回值后调用在ruler.fd(Function Dispatcher)中设置的callback,将结果发送至hub模块,由hub模块负责进行转发消息。

最后

以上就是兴奋芝麻最近收集整理的关于openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)前言openedge-function启动的全部内容,更多相关openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)前言openedge-function启动内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部