挖坑容易,填坑难!假期来填一填坑。

上篇文章讲了 channel 的基本使用,讲了一些使用时需要注意的事项,本文将重点介绍 channel 中的两个数据结构:循环队列双端链表

channel 的需求描述

为了理解这些数据结构解决了什么问题,我们先来做个简单的回顾,看看为什么需要这两个数据结构,他们解决了什么问题。我们知道 goroutine 是用户态的线程,不同的 goroutine 之间是有消息传递这个需求的。在原始的进程与线程(系统线程)编程中我们会采用管道的方式,而 channel 就是用户态线程传递消息的管道实现,并且是类型安全的。

既然 channel 是一个管道,用来满足不同 goroutine 间交换消息的。那么实现这样一个管道要怎么做呢?

来看看我们日常传递消息的需求:

  1. 能够有多个 goroutinechannel 进行读写,并且保证没有竞争问题,需要用 队列 来管理阻塞的 goroutine,解决竞争问题;
  2. 需要管理发送到 channel 的消息(有缓冲、无缓冲),对于有缓存的 channel 可以采用 循环队列 来管理多个消息。

当然上面的需求是经过简化的,比如 channel 还需要具备阻塞、唤醒 goroutine 的能力,不过为了本文我们更加专注焦点问题,先只关注上面两个问题。

channel 的数据结构

接下来我们分析一下 channel 在实际运行中,它的结构体是怎么样的。当然这又分为两种类型,有缓冲与无缓冲的。我们先来看一个无缓冲的情况。

无缓冲

先把示例代码贴出来。就是两个读的 goroutine 被阻塞在一个无缓冲的 channel 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
ch := make(chan int) // 无缓冲

go goRoutineA(ch)
go goRoutineB(ch)
ch <- 1

time.Sleep(time.Second * 1)
}

func goRoutineA(ch chan int) {
v := <-ch
fmt.Printf("A received data: %d\n", v)
}

func goRoutineB(ch chan int) {
v := <-ch
fmt.Printf("B received data: %d\n", v)
}

来看看当代码执行到 ch <- 1 这一行之后 channel 的结构体被填充成什么样子了!

无缓冲 channel

注意其中 buf 字段可存储的长度是0,这是因为 无缓冲 channel 不会用到循环队列来存储数据。它一定是等读、写 goroutine 都准备好了,然后直接把数据交给对方。我们用一副图来看一下无缓冲的数据交换过程。

交换数据

上图描述的是数据交换过程,再看一下读 goroutine 被阻塞的结构示意图。被阻塞的 goroutine 会挂载到对应的队列上,该队列是一个双端队列。

上面的例子,由于两个读 goroutine 在启动的时候,写还没有准备好,因此读全部被挂起在队列中;当有写goroutine准备好的时候,由于此时读已经就绪,因此写不会阻塞,挂起放到 sendq 中。大家可以修改上面的代码,自己看一下写阻塞,读立马执行的情况。

结构示例

有缓冲

我们将上面的代码改成有缓冲的通道,然后再来看看有缓冲的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
ch := make(chan int, 3) // 有缓冲

// 都不会阻塞
ch <- 1
ch <- 2
ch <- 3
// 会阻塞,被挂起到 sendq 中
go func() {
ch <- 4
}()

// 只是为了debug
var a int
fmt.Println(a)

go goRoutineA(ch)
go goRoutineA(ch)
go goRoutineB(ch)
go goRoutineB(ch) // 猜猜这里会被挂起吗?

time.Sleep(time.Second * 2)
}

func goRoutineA(ch chan int) {
v := <-ch
fmt.Printf("A received data: %d\n", v)
}

func goRoutineB(ch chan int) {
v := <-ch
fmt.Printf("B received data: %d\n", v)
}

贴出执行到第一行的 go goRoutineA(ch) 时, hchan 的结构填充情况。

有缓冲 channel

在这里可以看到缓冲的大小是3,由于增加了缓冲,只要写 goroutine 没有把缓冲写满,则不会导致协程阻塞。但是一旦缓冲没有多余的空间,则会把写 goroutine 挂起到 sendq 中,直到有空间时将他唤醒(还有其它唤醒的场景,这一略过)。

其实有缓冲的 channel,就是把同步的通信变为了异步的通信。写的 channel 不需要关注读 channel,只要有空间它就写;而读也一样,只要有数据就正常读就可以,如果没有就挂起到队列中,等待被唤醒。下图形象的展示了有缓冲 channel 是如何交换数据的。

交换数据

我们再来用图的形式看一下此时结构体的样子,这里图有些偷懒,只是在上面图的基础上增加了循环队列部分的描述,实际到该例子中,读 goroutine时不会被阻塞的,看的时候需要注意这一点。

结构示例

循环队列

今天最重要的是理解 channel 中两个关键的数据结构。为了下一讲阅读源码做准备,我把 channel 中的循环队列部分的代码抽象出来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 队列满了
var ErrQFull = errors.New("circular is full")

// 没有值
var ErrQEmpty = errors.New("circular is empty")

// 定义循环队列
// 如何确定队空,还是队满?q.sendx = (q.sendx+1) % q.dataqsiz
type queue struct {
buf []int // 队列元素存储
dataqsiz uint // circular 队列长度
qcount uint // 有多少元素在buf中 qcount = len(buf)
sendx uint // 可以理解为队尾指针,向队列写入数据
recvx uint // 可以理解为队头指针,从队列读取数据
}

func makeQ(size int) *queue {
q := &queue{
dataqsiz: uint(size),
buf: nil,
}

q.buf = make([]int, q.dataqsiz)

return q
}

// 向buf中写入数据
// 请看 chansend 函数
func (c *queue) insert(ele int) error {
// 检查队列是否有空间
if c.dataqsiz > 0 && c.qcount == c.dataqsiz {
return ErrQFull
}

// 存入数据
c.buf[c.sendx] = ele
c.sendx++ // 尾指针后移
if c.sendx == c.dataqsiz { // 如果相等,说明队列写满了,sendx放到开始位置
c.sendx = 0
}
c.qcount++

return nil
}

// 从buf中读取数据
func (c *queue) read() (int, error) {
// 队列中没有数据了
if c.dataqsiz > 0 && c.qcount == 0 {
return 0, ErrQEmpty
}

ret := c.buf[c.recvx] // 取出元素
c.buf[c.recvx] = 0
c.recvx++
if c.recvx == c.dataqsiz { // 如果相等,说明写到末尾了,recvx放到开始位置
c.recvx = 0
}
c.qcount--

return ret, nil
}

上面的代码基本上就是 channel 的循环队列部分的实现。这个队列的实现与我们平常实现的循环队列稍微有些不一样。一般我们为了方便判空,会浪费一个buf的空间来方便判空,公式是: (tail+1)%n=head ;但是在 channel 这里的循环队列,由于有了一个循环队列元素的计数,确保了这个空间不会被浪费,并且同时也能够满足 O(1) 时间复杂度计算有缓冲 channel 元素个数。

总结

总结一下今天的主要信息。

  1. channel 中用到了两个数据结构:循环队列双端链表
  2. 循环队列 只有在有缓冲 channel 中才会使用,它主要是做为消息的缓冲、保证消息的有序性;
  3. 双端链表 是用来挂起阻塞的读、写 goroutine 的,在被唤醒时会按照入队顺序公平的进行通知;
  4. 无缓冲的 channel 不会用到 循环队列 相关的结构,它必须读写 goroutine 都准备好后才能进行消息交换;
  5. 做为缓冲消息的 循环队列 通过一个当前元素个数字段的标记,避免了浪费一个数据空间。

下一章我们就尝试阅读一下 channel 的源码,想要尝试录制一个视频来讲这部分源码!

参考资料