crdb代码阅读01——cmux多路复用

结构体&函数

多路复用结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type cMux struct {
root net.Listener //net监听结构
bufLen int
errh ErrorHandler
donec chan struct{}
sls []matchersListener //协议匹配函数
}
type MuxConn struct {
net.Conn
buf bytes.Buffer
sniffer bufferedReader
}
type bufferedReader struct {
source io.Reader
buffer *bytes.Buffer
bufferRead int
bufferSize int
}

cMux用于监听某个端口,同时通过machersListener区分不同类型的请求,目前主要有两种请求:

  1. pg client连接
  2. grpc请求

MuxConn封装每个client连接。

初始化cmux

1
2
3
4
5
6
7
8
func New(l net.Listener) CMux {
return &cMux{
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
}
}

添加match函数

1
2
3
4
5
6
7
8
func (m *cMux) Match(matchers ...Matcher) net.Listener {
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
}
m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
return ml
}

接收请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (m *cMux) Serve() error {
var wg sync.WaitGroup
...
for {
c, err := m.root.Accept() //接收socket连接
if err != nil {
if !m.handleErr(err) {
return err
}
continue
}
wg.Add(1) // 一个连接一个goroutine,用于等待
go m.serve(c, m.donec, &wg)
}

匹配协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
muc := newMuxConn(c)
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.getSniffer())
if matched {
select {
case sl.l.connc <- muc:
case <-donec:
_ = c.Close()
}
return
}
}
}
_ = c.Close()
err := ErrNotMatched{c: c}
if !m.handleErr(err) {
_ = m.root.Close()
}
}
func (m *MuxConn) getSniffer() io.Reader {
m.sniffer = bufferedReader{source: m.Conn, buffer: &m.buf, bufferSize: m.buf.Len()}
return &m.sniffer
}
func (s *bufferedReader) Read(p []byte) (int, error) {
// Functionality of bytes.Reader.
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
p = p[bn:]
// Funtionality of io.TeeReader.
sn, sErr := s.source.Read(p)
if sn > 0 {
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return bn + wn, wErr
}
}
return bn + sn, sErr
}

这里使用了muc.getSniffer, 会将从conn读取的数据缓存在本地buffer中,如果第一次match失败的话,第二次match会调整bufferSize, 后续的读取就会先读取本地buffer,从而达到了可以多次进行match的目的。

匹配成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type muxListener struct {
net.Listener
connc chan net.Conn
}
case sl.l.connc <- muc:
func (l muxListener) Accept() (net.Conn, error) {
c, ok := <-l.connc
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
netutil.FatalIfUnexpected(httpServer.ServeWith(pgCtx, s.stopper, pgL, func(conn net.Conn) {
func (s *Server) ServeWith(
for {
rw, e := l.Accept()
  1. 将muc传递给muxlistner的connc管道
  2. muxlistener的Accept函数监听管道connc,从而获取了一个新的客户端连接

小结

这里比较复杂点的算法就是多路复用过程中,如何对socket进行重复读取,这里使用的是本地buffer缓存操作,通过每次设置bufferlen的长度,保证可以每个新的sniffer都可以重入。

有一个小问题,就是正常处理请求的时候,也会先判断buffer,然后再使用conn进行read。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// From the io.Reader documentation:
//
// When Read encounters an error or end-of-file condition after
// successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
func (m *MuxConn) Read(p []byte) (int, error) {
if n, err := m.buf.Read(p); err != io.EOF {
return n, err
}
return m.Conn.Read(p)
}

本文标题:crdb代码阅读01——cmux多路复用

文章作者:Louis

发布时间:2018年04月11日 - 15:04

最后更新:2018年04月12日 - 20:04

原始链接:/2018/04/11/crdb01-cmux/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。