概述
文章目录
- 前言
- 单中间者
- 多中间者
- 流程图
- 流程图分析
- 流程图总结
- 流水线示意图
- 整体总结
- 之前的问题是否已经迎刃而解
- 更多阅读
相关文章:
- [翻译]Go并发模式:构建和终止流水线
- go pipeline的流水线示意图
- chan基本知识
- go中关于for循环的知识
- 多进程通信
前言
学习go语言已经有几个月,但是关于go中的特性chan和routine的应用还不是很理解,如果搞不懂chan和routine的机制就很难流畅的用go编写出健壮的程序。所以我觉得关于chan应用的编程,是可以讲一讲的。
那我们首先会想到几个问题:
- 使用chan的代码与普通的代码有什么不同吗?
- 使用了chan后有什么好处吗?
- 怎么才能正确的使用chan呢?
我们先看第一个问题,众所周知 在go中chan是用来多个协程之间进行通信的,chan的应用思维是一种类似与client/servier的思维。也就是要有信息的生产者和消费者。既然如此,那就会设计到 不通协程之间的通信,而在顺序编程中是不会设计到通信的,是一种线性的流程。类似与下图:
在图中,我们看到,生产者和消费者之间的程序通过通信来进行相互影响的,而线性的程序是不会有这种问题,不管调用了多少函数,都只会在一条进程上顺序进行。
在下文中我们会用到一个中间者的概念。中间者是承接上游的消费者和开启下游的生产者。
单中间者
单中间者,就是在程序中只有一个线程连接生产者和消费者,不会进行扩增。
如下图所示:
在这种方式中,只有一个中间者,或者没有中间者。
- gen.go
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
分析:创建一个out的通道,并打开一条协程将nums通过循环传递给通道out,在所有的nums都传入out后关闭通道。在打开协程后返回创建的out。
这个函数在程序中起着生产者的作用,生产者和消费这之间的交流通道就是这个out,当out中消费一个数后,本函数就会再将一个数推送到通道out中。
- sq.go
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
创建了通道out作为生产者的信息输出,并接受in作为消费者的信息输入。
这程序中,即是消费者也是生产者,并在中间进行了数据的处理nn。作为消费者,当通道in中可以取数据的时候,取出n并进行运算nn,然后作为生产者将结果推送到out中。当in中有值时会进行消费,并计算nn,当out为空时会进行生产,将nn的结果推到out中。
- main.go
func main() {
in := gen(2, 3)
c1 := sq(in)
for n := range c1 {
fmt.Println(n) // 4 9
}
}
main函数先调用了上面两个函数,然后用了一个循环进行输出。
输出这个循环就是整个程序的消费者,当c1中有值时,就会取出来放到n中。
单中间者并不能体现使用chan的优越性,整个过程仍然是类似于线性的流程进行。在下文中我们会开启多个中间者进行复杂的数据运算和处理,因为我们可以同时处理多条数据,必然给我们带来性能上的提高。
多中间者
单中间者,就是在程序中只有一个线程连接生产者和消费者,不会进行扩增。
在上图中,一个生产者被多个中间者处理生产的信息,以提高消费的效率。然后用一个merge的中间者收集各个中间者生产的信息,并将这些信息统一的交给消费者。
- merge.go
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
time.Sleep(222 * time.Second)
wg.Wait()
close(out)
}()
return out
}
这个merge函数给我带来了很大的困扰。
- 程序中有两个for循环,其中对cs的循环是针对数组的,循环的作用是对每个通道打开一个协程。而协程中的循环是监控通道用的,当通道close时推出循环。
- 为什么wg.Wait要用一个新的协程来,如果不用新的协程而写在函数体里面不行吗?当然是不行的,这里要注意一个与普通程序的分别:在使用chan的程序中,主程序的作用类似与初始化service和client而不进行具体的运算,所以肯定不能在初始化的时候进行wait,因为这里还是要进行生产,如果进行wait就不会运行main中的print(也就是消费者),而通道内的数据不进行消费就会形成阻塞,导致程序无法运行。
*那我已经知道了不能用在函数体里面,为什么我另起一条协程,不会出问题呢?? 这涉及到后面部分,如果你明白了程序运行的整个流程,肯定可以解决这个疑问,我们把这个问题留到后面解决
- main.go
func main() {
in := gen(2, 3)
// Distribute the sq work across three goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
在函数中先用生产者,通过通道传递两个数,然后用两个sq中间者进行中间计算,然后用merge进行集合,最后通过print进行消费。
在初次看到这个程序的时候,我是觉得程序不会输出所有的结果的看,为什么我会这么看呢?因为在merge中的wait是在另起的一个协程中运行的,而不是在主程序中。如果不能理清楚整个程序的流程就会第一时间出现这种误解。那我们只有模拟程序一步一步的运行,来判断为什么这个程序可以顺利的运行。
流程图
因此我画了下面的流程图,图中标注了程序的step。(不通协程之间的step是可以重合的)
我们来过一遍上图的整个流程,你就发现程序中的所有疑问都迎刃而解。(虽然我们这里使用了多个中间者的方式,但是因为我们仅仅模拟了一个数据输入,所以只展示一个中间者就可以表示整个流程)
流程图分析
step 0:
[main] 调用程序 gen(3)
step 1:
[gen] 定义通道 out
step 2:
step 3:
[gen] 进入协程sub(gen)中的循环,判断循环是否结束,同时return out
step 4:
[main] 进入程序sq
[sq] 定义通道out,gen中的out定义为in
[sub(gen)] n-> out ##因为还没有遇到消费者,所以造成了阻塞
step 5:
[sq]进入协程sub(sq)中的循环,判断循环是否结束,同时return out
step 6:
[main] 进入程序merge
[merge] wg.Add(1)并判断cs[]循环时候结束
[sub(sq)] 从in中取出数n,并计算n*n,然后传入通道out ##这时in通道不阻塞 out 通道阻塞
step 7:
[sub(gen)] out不再阻塞,判断循环是否结束
[merge] 判断循环是否结束,如果没结束,进入sub(merge),结束进入sub(wait)
step 8:
[sub(gen)] 判断循环结束,close(out)
[sub(merge)]判断传入的通道是否关闭,
[sub(wait)] 判断merge协程是否都已经运行结束,没有结束,等待
[merge]return out 返回main
step 9:
[sub(merge)] 判断通道c未关闭,将c->out, ##这里out中的值没有消费,阻塞。c被消费不在阻塞
[main] 判断merge返回的通道t是否关闭
step10:
[sub(sq)] out通道开发,判断通道in是否关闭
[main] 通道t没有关闭,并输出t,此时t通道不在阻塞
step11:
[sub(sq)]in通道关闭(step8),close(out),协程结束
[main] t中没有值,不进行操作
[sub(merge)]out通道不再阻塞,判断c通道是否关闭
step12:
[sub(merge)] c通道关闭(step11) close(out) 协程结束
step13:
[sub(wait)]因为step12中协程结束,所以close(out)
step14:
[main]判断通道t关闭(step13),结束程序
流程图总结
- 只有等待,阻塞后输入,通道空后读出才会造成协程跳过一些step。
- main主线程因为要等到通道t关闭,也就是要等待sub(wait)结束,而sub(wait)要等到所有的协程结束,所以肯定不会发生主线程跑完但是协程还没有跑完的情况。
流水线示意图
这是对 go中关于chan应用的程序分析
文章中代码的流水流水线示意图。
- 我们假设gen,sq和print操作需要的时间单位都是1:
time | gen | sq | |
---|---|---|---|
1 | out:1 | ||
2 | out:2 | out: 1 | |
3 | out:3 | out: 4 | 1 |
4 | out:4 | out: 9 | 4 |
5 | out:5 | Out: 16 | 9 |
6 | out:6 | Out: 25 | 16 |
7 | out:7 | Out: 36 | 25 |
8 | out:8 | Out: 49 | 36 |
9 | close(out) | close(out) | done<- |
我们假设gen,sq和print没一个处理步骤是相同的,那我们就可以明显的看到一个流水线的形式,而最后我们给done这个通道推入数据,gen,sq同时关闭并退出。
- 但是在程序的主体运行部分,sq函数通常需要更多的时间进行数据运算或其他相关的操作。我们假设sq的需要2个时间单位。
time | gen | sq | |
---|---|---|---|
1 | 1 | ||
2 | 2 | 1 | |
3 | blocked | going | |
4 | 3 | 4 | 1 |
5 | blocked | going | waiting |
6 | 4 | 9 | 4 |
7 | blocked | going | waiting |
8 | 5 | 16 | 9 |
9 | close(out) | close(out) | done<- |
由于sq需要2个时间单位来进行运算,这就导致了上游方法gen输出通道阻塞,和下游方法print读取通道等待。增加了时间的消耗。
- 由于sq需要消耗2个时间单位所以我们开启2个sq函数,同时用一个merge函数处理两个sq函数的通道输出。如下:
time | gen | Sq1 | Sq2 | Merge | |
---|---|---|---|---|---|
1 | 1 | ||||
2 | 2 | 1 | |||
3 | 3 | Going | 4 | ||
4 | 4 | 9 | Going | 1 | |
5 | 5 | Going | 16 | 4 | 1 |
6 | 6 | 25 | Going | 9 | 4 |
7 | 7 | Going | 36 | 16 | 9 |
8 | 8 | 49 | Going | 25 | 16 |
9 | close(out) | close(out) | close(out) | close(out) | done<- |
在这个流水线中,用了两个sq和一个merge作为中间函数来进行数据处理。这样同样在9个时间单位内我们可以输出4个值,同时也不会造成阻塞等行为。
- 既然这样我们开3个通道会如何呢?
time | gen | Sq1 | Sq2 | Sq3 | Merge | |
---|---|---|---|---|---|---|
1 | 1 | |||||
2 | 2 | 1 | ||||
3 | 3 | Going | 4 | |||
4 | 4 | Going | 9 | 1 | ||
5 | 5 | 16 | Going | 4 | 1 | |
6 | 6 | 25 | Going | 9 | 4 | |
7 | 7 | Going | 36 | 16 | 9 | |
8 | 8 | Going | 49 | 25 | 16 | |
9 | close(out) | close(out) | close(out) | close(out) | done<- |
这时可以看出,如果通道开的多了之后总会有一部分通道没有处于满负荷运转,这样会增加我们的空间消耗。
- 而如果sq需要华为3个时间单位呢?
time | gen | Sq1 | Sq2 | Sq3 | Merge | |
---|---|---|---|---|---|---|
1 | 1 | |||||
2 | 2 | 1 | ||||
3 | 3 | Going | 4 | |||
4 | 4 | Going | Going | 9 | ||
5 | 5 | 16 | Going | Going | 1 | |
6 | 6 | Going | 25 | Going | 4 | 1 |
7 | 7 | Going | Going | 36 | 9 | 4 |
8 | 8 | 49 | Going | going | 16 | 9 |
9 | close(out) | close(out) | close(out) | close(out) | done<- |
在这个流水线中,开三个通道,同时sq的处理时间是3个时间单位,正好每个协程都在运转且不会造成阻塞或等待的现象。
由此可知,我们在一个pipeline中,其中的一个部分是否并行主要根据各子程序运行一次的时间来判断,运行时间越长,越应该用多个协程处理。
用拓扑图进行表示:
整体总结
使用go就不可避免的要用到协程和通道,耐心的分析是写好代码的基础。当不了解程序的运行过程时,我们需要尽可能清晰的整理程序运行的流程。
在理解了程序的运行原理之后,就可以根据实际情况分析在什么时候使用多个中间者,使用多少个中间者,怎么进行收束等。(下面会给出使用多中间者的一个稍微复杂一点的代码,这时我在工作中使用的一个go的脚本)
之前的问题是否已经迎刃而解
-
为什么整个程序在主程序中没有等待协程程序结束的类似wg.Wait的语句,仍然程序可以正确的输出?
-
为什么在merge中wg.Wait是需要另外起一个协程,而不是在函数的主程序中进行wait?
-
make(chan int)和make(chan int ,1)的区别,在本程序中能把所有的chan声明从前者换成后者吗?
-
本程序是把php的一个数组通过一定的映射关系,转换为json格式的字符串,并保存到文件中。
更多阅读
- 并发与并行不同 原视频
- pipeline(这是文章代码出处)后续会有翻译版本
- [翻译]Go并发模式:构建和终止流水线
- go pipeline的流水线示意图
package main
import (
"encoding/json"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
"io/ioutil"
"regexp"
"strings"
"sync"
)
var php = `
[
'耽美同人' => ['first_id' => 2, 'name' => '纯爱小说', 'third' => '百合纯爱'],
'悬疑推理' => ['first_id' => 2, 'name' => '悬疑灵异', 'third' => '恐怖惊悚'],
]
`
type Item struct {
FirstID int `json:"first_id"`
Name string `json:"name"`
Third string `json:"third"`
}
type CategoryReplaceMap struct {
ID int `json:"id"`
Before string `json:"before"`
After string `json:"after"`
}
type Result struct {
CopyrightCateGory string `json:"copyright_category"`
First *CateGoryField `json:"stack_first_category_id"`
Second *CateGoryField `json:"stack_second_category_id"`
Third *CateGoryField `json:"stack_third_category_id"`
}
type Response struct {
CopyrightCateGory string `json:"copyright_category"`
First int `json:"stack_first_category_id"`
Second int `json:"stack_second_category_id"`
Third int `json:"stack_third_category_id"`
}
type CateGoryField struct {
Name string `json:"name"`
Parent int `json:"parent"`
Rank int `json:"rank"`
ID int `json:"id"`
}
func main() {
phpMap, err := parseToMap()
if err != nil {
fmt.Println("解析成map错误")
return
}
response := make(map[string][]Response)
s := product(phpMap)
v := make([]<-chan *Response, len(phpMap))
for i := 0; i < len(phpMap); i++ {
v[i] = middle(s)
}
out := merge(v)
for item := range out {
response["category_mappers"] = append(response["category_mappers"], *item)
}
findNotMap(phpMap, response)
resultJson, err := json.Marshal(response)
ioutil.WriteFile("test.json", resultJson, 0777)
fmt.Println(string(resultJson), err)
}
func parseToMap() (map[string]*Item, error) {
var phpMap = make(map[string]*Item)
rePlaceMap, err := getReplaceMapFromDB()
if err != nil {
fmt.Println(err)
}
fmt.Println("分类改变的对应关系")
for k, v := range rePlaceMap {
fmt.Println(k, v)
}
phpjson := strings.ReplaceAll(php, " ", "")
for k, v := range rePlaceMap {
phpjson = strings.ReplaceAll(phpjson, k, v)
}
phpjson = strings.Replace(phpjson, "=>", ":", -1)
phpjson = strings.Replace(phpjson, "'", """, -1)
phpjson = strings.Replace(phpjson, "[", "{", -1)
phpjson = strings.Replace(phpjson, "]", "}", -1)
reg1 := regexp.MustCompile(`([d]+) *:`)
if reg1 == nil {
fmt.Println("regexp err")
return nil, errors.New("匹配模式错误1")
}
//根据规则提取关键信息
phpjson = reg1.ReplaceAllString(phpjson, `"${1}" :`)
reg2 := regexp.MustCompile(`(}),(n *})`)
if reg2 == nil {
fmt.Println("regexp err")
return nil, errors.New("匹配模式错误1")
}
//根据规则提取关键信息
phpjson = reg2.ReplaceAllString(phpjson, `${1}${2}`)
fmt.Println(phpjson)
err = json.Unmarshal([]byte(phpjson), &phpMap)
fmt.Println(err)
return phpMap, err
}
//product是把正常的输入传入通道,
func product(in map[string]*Item) <-chan map[string]*Item {
out := make(chan map[string]*Item)
go func() {
for k, v := range in {
out <- map[string]*Item{k: v}
}
close(out)
}()
return out
}
//middle 是一个处理数据的函数,
func middle(in <-chan map[string]*Item) <-chan *Response {
out := make(chan *Response)
go middleHand(in, out)
return out
}
func middleHand(in <-chan map[string]*Item, out chan *Response) {
for aMap := range in {
res := &Response{}
for k, v := range aMap {
dbRes := getDBByNameRes(v.Third)
wg := sync.WaitGroup{}
for _, item := range dbRes {
wg.Add(1)
go func(item *CateGoryField) {
defer func() {
wg.Done()
if err := recover(); err != nil {
fmt.Printf("book, panic: %+vn", err)
}
}()
resultItem := &Result{}
resultItem.CopyrightCateGory = k
getResult(item, resultItem)
if resultItem.First.ID == v.FirstID && resultItem.Second.Name == v.Name {
responseItem := &Response{}
responseItem.CopyrightCateGory = resultItem.CopyrightCateGory
responseItem.First = resultItem.First.ID
responseItem.Second = resultItem.Second.ID
responseItem.Third = resultItem.Third.ID
res = responseItem
}
}(item)
}
wg.Wait()
}
out <- res
}
close(out)
}
//merge是把middle的输出整合后传入一个通道
func merge(in []<-chan *Response) <-chan *Response {
out := make(chan *Response)
wg := sync.WaitGroup{}
output := func(c <-chan *Response) {
for n := range c {
out <- n
}
wg.Done()
}
for _, i := range in {
wg.Add(1)
go output(i)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func getResult(item *CateGoryField, resultItem *Result) {
if item.Rank == 1 {
resultItem.First = item
}
if item.Rank == 2 {
resultItem.Second = item
dbRes := getDBByID(item.Parent)
getResult(dbRes, resultItem)
}
if item.Rank == 3 {
resultItem.Third = item
dbRes := getDBByID(item.Parent)
getResult(dbRes, resultItem)
}
}
func getReplaceMapFromDB() (map[string]string, error) {
res := []*CategoryReplaceMap{}
db, err:= connectDB("distribution_w:********@tcp(rm-******.mysql.******.rds.aliyuncs.com:3306)/content_distribution?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil{
return nil,err
}
err = db.Table("category_change").Find(&res).Error
if err != nil {
fmt.Println("查询数据库失败")
return nil, err
}
response := make(map[string]string)
for _, item := range res {
item.Before = strings.ReplaceAll(item.Before, "n", "")
item.After = strings.ReplaceAll(item.After, "n", "")
item.Before = strings.ReplaceAll(item.Before, " ", "")
item.After = strings.ReplaceAll(item.After, " ", "")
response[item.Before] = item.After
}
return response, nil
}
func getDBByNameRes(name string) []*CateGoryField {
res := make([]*CateGoryField, 0)
//res:= execsql(sql)
db, err:= connectDB("distribution_w:********@tcp(rm-******.mysql.******.rds.aliyuncs.com:3306)/content_distribution?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil{
return nil
}
err = db.Table("category").Where("name = ? ", name).Find(&res).Error
if err != nil {
fmt.Println("查询数据库失败")
return nil
}
return res
}
func getDBByID(id int) *CateGoryField {
res := &CateGoryField{}
db, err:= connectDB("distribution_w:********@tcp(rm-******.mysql.******.rds.aliyuncs.com:3306)/content_distribution?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil{
return nil
}
err = db.Table("category").Where("id = ? ", id).Find(&res).Error
if err != nil {
fmt.Println("查询数据库失败")
return nil
}
return res
}
func connectDB(connect string) (*gorm.DB, error) {
db, err := gorm.Open("mysql", connect)
if err != nil {
fmt.Println("连接数据库失败")
return nil,err
}
return db,nil
}
func findNotMap(phpMap map[string]*Item, response map[string][]Response) {
for k := range phpMap {
var yn bool
for _, v := range response["category_mappers"] {
if k == v.CopyrightCateGory {
yn = true
break
}
}
if yn {
continue
}
fmt.Println(k, phpMap[k], " not in the category_mappers")
}
}
最后
以上就是现实西牛为你收集整理的go中关于chan应用的程序分析的全部内容,希望文章能够帮你解决go中关于chan应用的程序分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复