Go nethttp 学习笔记中,如何高效构建SEO策略?

摘要:概述 在使用 Go 开发时几乎都会用到 nethttp 标准库。但是,对库的内部实现不了解,仅限于会用。遇到问题容易懵,比如: 长连接和短连接有什么区别?具体什么实现原理? nethttp 如何处理并发请求? nethttp 有用到缓
概述 在使用 Go 开发时几乎都会用到 net/http 标准库。但是,对库的内部实现不了解,仅限于会用。遇到问题容易懵,比如: 长连接和短连接有什么区别?具体什么实现原理? net/http 如何处理并发请求? net/http 有用到缓存吗?缓存用来干什么? ...... 单个问题各个击破,不如深入梳理下 net/http 标准库一网打尽。本文将深入学习 net/http 标准库,力图做到知其然知其所以然。 服务端 源码走读 先看一段服务端示例代码: func helloHandler(w http.ResponseWriter, r *http.Request) { // 向客户端写入响应内容 fmt.Fprint(w, "Hello I am server 3") } func main() { // 创建自定义的ServeMux实例 mux := http.NewServeMux() // 在mux上注册路由和处理函数 mux.HandleFunc("/", helloHandler) // 启动服务器并监听12302端口,使用自定义的mux作为handler fmt.Println("Server is listening on port 12302...") if err := http.ListenAndServe(":12302", mux); err != nil { // 错误处理 fmt.Printf("Failed to start server: %v\n", err) } } 示例中有几类概念需要介绍。 多路复用器 http.ServeMux type ServeMux struct { mu sync.RWMutex // 多路复用器锁 tree routingNode // 路由节点,用来存储路由 pattern 和对应的 handler ... } 路由处理器 handler handler 是一个实现 func(ResponseWriter, *Request) 函数接口的函数,用来处理请求。 流程 服务端启动需要经过以下流程。 1) 创建多路复用器 创建自定义 http.ServeMux 多路复用器,如果不创建的话,则会使用默认 http.DefaultServeMux 多路复用器: // NewServeMux allocates and returns a new [ServeMux]. func NewServeMux() *ServeMux { return &ServeMux{} } var DefaultServeMux = &defaultServeMux var defaultServeMux ServeMux 2) 注册路由处理器 调用多路复用器的 HandleFunc 方法注册路由处理器: func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) { if use121 { mux.mux121.handleFunc(pattern, handler) } else { // 将 pattern 和路由处理器注册到多路复用器 mux.register(pattern, HandlerFunc(handler)) } } 注册的过程实际是将 pattern 和 handler 的映射关系写入 ServeMux.tree 中。可以根据请求的 pattern 从 ServeMux.tree 中获取对应的 handler。 3)监听服务 调用 http.ListenAndServe 监听服务: func ListenAndServe(addr string, handler Handler) error { server := &Server{Addr: addr, Handler: handler} return server.ListenAndServe() } func (s *Server) ListenAndServe() error { ... // 调用 net.Listen 获取 listener ln, err := net.Listen("tcp", addr) if err != nil { return err } return s.Serve(ln) } func (s *Server) Serve(l net.Listener) error { ... for { // Accept waits for and returns the next connection to the listener. rw, err := l.Accept() ... c := s.newConn(rw) // 异步启动协程处理请求 go c.serve(connCtx) } } 监听服务监听到请求后会异步调用 conn.serve 启动协程处理请求: func (c *conn) serve(ctx context.Context) { for { // 读请求 w, err := c.readRequest(ctx) ... // 调用多路复用器的 ServeHTTP 方法处理请求 serverHandler{c.server}.ServeHTTP(w, w.req) ... } func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) { // 如果多路复用器为空,则用默认多路复用器 handler := sh.srv.Handler if handler == nil { handler = DefaultServeMux } ... handler.ServeHTTP(rw, req) } func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) { ... var h Handler if use121 { h, _ = mux.mux121.findHandler(r) } else { // 根据请求从多路复用器找到请求 pattern 对应的路由处理器 h, r.Pattern, r.pat, r.matches = mux.findHandler(r) } // 调用路由处理器的 ServeHTTP 方法处理请求 h.ServeHTTP(w, r) } 服务端的处理流程并不复杂,主要是注册路由处理器和回调路由处理器过程,流程图就不画了,接下来介绍客户端的处理逻辑,这是比较复杂的部分。 客户端 核心数据结构 Client type Client struct { // 通信模块,负责和服务端建立通信 Transport RoundTripper // Cookie 模块,负责管理 Cookie Jar CookieJar // 超时时间,这个时间是请求处理的总超时时间 Timeout time.Duration RoundTripper type RoundTripper interface { RoundTrip(*Request) (*Response, error) } RoundTripper 是通信模块的 interface,需要实现方法 Roundtrip。通过传入请求 Request,与服务端交互后获得响应 Response。 http.Transport type Transport struct { idleMu sync.Mutex ... // 空闲连接,实现复用,每个连接只能被一个请求使用 idleConn map[connectMethodKey][]*persistConn // 等待连接队列,需要等待的连接请求会放到 idleConnWait 中 idleConnWait map[connectMethodKey]wantConnQueue // 空闲连接 lru,结合 idleConn 根据连接时间管理连接 idleLRU connLRU // 建立长连接开关,如果为 true 则不复用连接 DisableKeepAlives bool Transport 是实现了 RoundTripper 接口的方法,是用于通信的模块。 源码走读 客户端请求的示例如下: func main() { resp, err := client.Get("http://localhost:12302/") if err != nil { // 错误处理 fmt.Printf("Failed to send request: %v\n", err) return } // 读取响应体内容 body, err := io.ReadAll(resp.Body) if err != nil { // 错误处理 fmt.Printf("Failed to read response body: %v\n", err) return } // 打印服务器返回的响应内容 fmt.Printf("Server response: %s\n", body) if err := resp.Body.Close(); err != nil { fmt.Printf("Failed to close response body: %v\n", err) } } 请求服务端 client.Get 调用 api 请求服务端,获取响应。 func (c *Client) Get(url string) (resp *Response, err error) { // 构造请求体 request req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } // 调用 client.Do(req) 获取响应 return c.Do(req) } func (c *Client) Do(req *Request) (*Response, error) { return c.do(req) } func (c *Client) do(req *Request) (retres *Response, reterr error) { ... for { ... // 调用 client.send() 获取响应 if resp, didTimeout, err = c.send(req, deadline); err != nil { ... } ... } 客户端调用 client.send() 发送请求到服务端,获取响应。 func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { ... resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { return nil, didTimeout, err } ... return resp, nil, nil } func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { ... // 通信模块 Transport.RoundTrip resp, err = rt.RoundTrip(req) if err != nil { ... } ... } 通信模块 Transport 开始接管通信过程。 func (t *Transport) RoundTrip(req *Request) (*Response, error) { return t.roundTrip(req) } func (t *Transport) roundTrip(req *Request) (_ *Response, err error) { ... // 获取连接 pconn, err := t.getConn(treq, cm) var resp *Response if pconn.alt != nil { // HTTP/2 path. resp, err = pconn.alt.RoundTrip(req) } else { // 调用连接的 roundTrip 方法获取服务端响应 resp, err = pconn.roundTrip(treq) } ... } Transport.roundTrip 方法是这里的重点。主要包括两大逻辑: 调用 Transport.getConn 获取连接; 调用连接的 roundTrip 方法获取响应; 获取连接 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) { ... // 构造连接请求对象 wantConn w := &wantConn{ cm: cm, key: cm.key(), ctx: dialCtx, cancelCtx: dialCancel, result: make(chan connOrError, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func() { if err != nil { w.cancel(t, err) } }() // 获取连接 if delivered := t.queueForIdleConn(w); !delivered { t.queueForDial(w) } // 异步获取结果并处理 context select { case r := <-w.result: ... return r.pc, r.err case <-treq.ctx.Done(): ... } } Transport.getConn 首先构造连接请求对象 wantConn,然后根据 wantConn 调用 Transport.queueForIdleConn 获取连接。如果获取不到,调用 t.queueForDial 创建连接。 获取连接的过程是异步的,个人理解是创建连接的时间是不确定的,可以根据 context 上下文实现优雅退出,防止阻塞。 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { // 如果 disable keep alive 则返回 if t.DisableKeepAlives { return false } ... // 判断连接是否存在于 Transport 中 // 如果存在则复用连接,如果不存在则创建连接 if list, ok := t.idleConn[w.key]; ok { stop := false delivered := false for len(list) > 0 && !stop { // 复用连接 pconn := list[len(list)-1] ... delivered = w.tryDeliver(pconn, nil, pconn.idleAt) if delivered { ... } } // 更新 Transport.idleConn // 同一个连接不能被多个请求使用,所以这里要删除 idleConn 中的连接 if len(list) > 0 { t.idleConn[w.key] = list } else { delete(t.idleConn, w.key) } if stop { return delivered } } // 连接不存在于 Transport.idleConn 中 // 将 wantConn 加入到 Transport.idleConnWait 队列中,等待连接 if t.idleConnWait == nil { t.idleConnWait = make(map[connectMethodKey]wantConnQueue) } q := t.idleConnWait[w.key] q.cleanFrontNotWaiting() q.pushBack(w) t.idleConnWait[w.key] = q return false } Transport.queueForIdleConn 判断 Transport.idleConn 是否有空闲连接,如果有则调用 wantConn.tryDeliver 传递连接: func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool { w.mu.Lock() defer w.mu.Unlock() ... // 实际是往 wantConn.result 通道中写 connOrError 对象,该对象中包括连接 pc w.result <- connOrError{pc: pc, err: err, idleAt: idleAt} close(w.result) return true } 如果 Transport.idleConn 没有连接,则将 wantConn 加入等待队列 Transport.idleConnWait。然后调用 Transport.queueForDial 创建连接。 func (t *Transport) queueForDial(w *wantConn) { ... // 判断连接请求是否达到 Transport.connsPerHost 上限 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 // 如果没到请求上限,创建连接 t.startDialConnForLocked(w) return } ... // 如果达到请求上限,将请求加入等待队列 if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFrontNotWaiting() q.pushBack(w) t.connsPerHostWait[w.key] = q } Transport.queueForDial 判断连接是否达到 Transport.connsPerHost 上限。如果达到则将连接请求加入等待队列,如果未达到则调用 Transport.startDialConnForLocked 创建连接。 func (t *Transport) startDialConnForLocked(w *wantConn) { ... go func() { // 调用 Transport.dialConnFor 创建连接 t.dialConnFor(w) t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() w.cancelCtx = nil }() } func (t *Transport) dialConnFor(w *wantConn) { ... // 调用 Transport.dialConn 创建连接 pc, err := t.dialConn(ctx, w.cm) // deliver 创建的连接 delivered := w.tryDeliver(pc, err, time.Time{}) ... } Transport.dialConnFor 调用 Transport.dialConn 创建连接,然后 deliver 该连接。 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } if cm.scheme() == "https" && t.hasCustomTLSDialer() { ... } else { // 调用 Transport.dial 创建连接 conn, err := t.dial(ctx, "tcp", cm.addr()) ... // 将连接赋给 pconn pconn.conn = conn } // 将 Reader 和 Writter 赋给 pconn // pconn.br 负责从连接中读服务端的响应,pconn.bw 负责写客户端请求到连接 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) // 异步启动两个伴生协程负责读写连接 go pconn.readLoop() go pconn.writeLoop() return pconn, nil } Transport.dialConn 这个方法很重要,它负责创建连接,创建连接的过程实际是和服务端进行三次握手建立 TCP 连接的过程。接着,启动两个伴生协程负责读写连接。 获取到连接后,会用这个连接获取服务端响应。 获取响应 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) { ... for { // 获取连接 pconn, err := t.getConn(treq, cm) ... var resp *Response if pconn.alt != nil { // HTTP/2 path. resp, err = pconn.alt.RoundTrip(req) } else { // 通过连接获取响应 resp, err = pconn.roundTrip(treq) } } ... } func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ... // 写消息到连接的 writech 通道 pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) // 写消息到连接的 reqch 通道 pc.reqch <- requestAndChan{ treq: req, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } // 闭包函数处理响应 handleResponse := func(re responseAndError) (*Response, error) { ... return re.res, nil } for { select { ... // 监听连接的 reqch.ch 通道 case re := <-resc: return handleResponse(re) } } } 在获取响应这里将 writeRequest 和 requestAndChan 写入通道 pc.writech 和 pc.reqch 中,接着监听 pc.reqch.ch 通道,如果通道中有数据,则调用 handleResponse 处理响应。 那么,是谁在消费 pc.writech 和 pc.reqch 呢?又是谁在往 pc.reqch.ch 写数据呢? 回答这个问题,需要看读写连接的伴生协程。 读写连接 写协程 func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { // 监听 pc.writech 通道 case wr := <-pc.writech: startBytesWritten := pc.nwrite // 写请求到连接,服务端会从连接中接受到该请求并返回响应 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) ... pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } // 如果收到关闭请求,则退出写协程 case <-pc.closech: return } } } 可以看到,写协程主要做了两件事: 作为消费者监听 pc.writech 通道,将请求写入连接; 阻塞等待 pc.closech,退出协程; 读协程 读协程负责从连接中读取服务端的响应数据。 func (pc *persistConn) readLoop() { ... defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() ... alive := true for alive { ... // 阻塞请求通道 rc := <-pc.reqch trace := rc.treq.trace var resp *Response if err == nil { // 从连接中读响应数据 resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } ... waitForBodyRead := make(chan bool, 2) // 构造 body body := &bodyEOFSignal{ body: resp.Body, ... fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } // 将构造的 body 赋值到 resp.Body resp.Body = body ... select { // 将响应 resp 传递给 rc.ch 通道 case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } select { // 阻塞等待 case bodyEOF := <-waitForBodyRead: alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(rc.treq) if bodyEOF { eofc <- struct{}{} } case <-rc.treq.ctx.Done(): alive = false pc.cancelRequest(context.Cause(rc.treq.ctx)) // 退出协程 case <-pc.closech: alive = false } rc.treq.cancel(errRequestDone) testHookReadLoopBeforeNextRead() } } 读协程作为消费者读取 pc.reqch 通道,接着调用 persistConn.readResponse 读取响应。然后,构造响应 body,最后阻塞等待。 至此,基本过完了创建连接,读写连接的逻辑。下面介绍如何复用连接。 复用连接 将视线移到 persistConn.readLoop 构造 body 的逻辑: // 构造 body body := &bodyEOFSignal{ body: resp.Body, ... fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } body.fn 函数会写 isEOF 到 waitForBodyRead 通道,这个通道是读协程在消费: func (pc *persistConn) readLoop() { ... for alive { select { // 接收 `waitForBodyRead` case bodyEOF := <-waitForBodyRead: alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && // 调用 tryPutIdleConn 将连接加入到空闲队列 tryPutIdleConn(rc.treq) if bodyEOF { eofc <- struct{}{} } ... } ... } 读协程在收到 waitForBodyRead 通道数据后,会根据一系列判断调用 tryPutIdleConn 将连接加入到 Transport 的空闲队列中。 func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() tryPutIdleConn := func(treq *transportRequest) bool { trace := treq.trace // 调用 pc.t.tryPutIdleConn 添加连接 if err := pc.t.tryPutIdleConn(pc); err != nil { ... } ... return true } ... } func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { return errKeepAlivesDisabled } // 如果连接已经 broken 了,则返回 error if pconn.isBroken() { return errConnBroken } // 标记连接是可复用的 pconn.markReused() // 加锁 t.idleMu.Lock() defer t.idleMu.Unlock() // 获取连接的 key key := pconn.cacheKey // 判断连接的 key 是否在 Transport.idleConnWait 等待队列 // 如果在等待队列中,则 remove 等待队列中的 wantConn if q, ok := t.idleConnWait[key]; ok { // 如果连接在 Transport.idleConnWait 等待队列 done := false if pconn.alt == nil { // HTTP/1. // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. for q.len() > 0 { w := q.popFront() if w.tryDeliver(pconn, nil, time.Time{}) { done = true break } } } else { ... } } if q.len() == 0 { delete(t.idleConnWait, key) } else { t.idleConnWait[key] = q } if done { return nil } } // 如果空闲队列已经 close 了,退出 if t.closeIdle { return errCloseIdle } // 如果空闲队列为 nil,初始化空闲队列 if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } // 建立请求和连接的映射存到空闲队列中 idles := t.idleConn[key] if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost } for _, exist := range idles { if exist == pconn { log.Fatalf("dup idle pconn %p in freelist", pconn) } } // 将连接加入到空闲队列中 t.idleConn[key] = append(idles, pconn) // 将连接加入到 lru 缓存,lru 缓存可以用来管理连接 t.idleLRU.add(pconn) if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } ... return nil } 在 Transport.tryPutIdleConn 中将连接加入到空闲队列和 LRU 缓存,后续的请求可以从空闲队列中复用连接。过期的连接会从空闲队列和 LRU 缓存移除。 基准测试 我们构造了三种场景用于测试性能: 连接池复用连接; 连接池不复用连接; 客户端复用连接,而服务端关闭连接; 具体代码实现在 这里 测试结果如下: go test -bench=. -benchmem -run=^$ goos: darwin goarch: arm64 pkg: client cpu: Apple M3 BenchmarkServerClosesConnection-8 21 52007139 ns/op 21505 B/op 139 allocs/op BenchmarkWithConnectionPool-8 51 21748487 ns/op 18985 B/op 127 allocs/op BenchmarkWithoutConnectionPool-8 10 102032821 ns/op 23189 B/op 140 allocs/op PASS ok client 3.702s 可以看到,有复用的情况性能最好,无复用的情况性能最差,而客户端复用,服务端关闭连接的情况介于二者之间。个人猜测,虽然连接已经关闭了,但是还是有部分资源是可复用的,相比于无复用性能会好点。 小结 本文介绍了 net/http 标准库的服务端和客户端流程,相比于服务端,客户端要更复杂,大致画出客户端处理流程如下: 参考资料 Golang http 标准库底层原理解析 构建高性能HTTP客户端的7个关键步骤 Golang HTTP 标准库实现原理