The smallest unit of HTTP2 communication is a data frame. Each frame contains two parts: Frame Header and Payload. Frames from different data streams can be sent interleaved (frames from the same data stream must be sent sequentially) and then reassembled based on the data stream identifier in each frame header.
Since the Payload contains valid data, only the frame header is analyzed and described.
The total length of the frame header is 9 bytes, and contains four parts, namely:
The length of Payload takes three bytes.
Data frame type, occupying one byte.
Data frame identifier, occupying one byte.
Data stream ID, occupying four bytes.
The diagram is as follows:
The format of the data frame and the meaning of each part are already clear, so let’s see how to read a frame header in the code:
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) { _, err := io.ReadFull(r, buf[:http2frameHeaderLen]) if err != nil { return http2FrameHeader{}, err } return http2FrameHeader{ Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), Type: http2FrameType(buf[3]), Flags: http2Flags(buf[4]), StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), valid: true, }, nil }
In the above code http2frameHeaderLen
is a constant with a value of 9.
After reading 9 bytes from io.Reader, convert the first three bytes and the last four bytes into the uint32
type to get the Payload length and data Stream ID. Another thing that needs to be understood is that the first three bytes and last four bytes of the frame header are stored in big endian format (I will not explain the big and small endian here. Readers who do not understand it are asked to Baidu themselves).
According to the description at http://http2.github.io/http2-spec/#rfc.section.11.2, there are a total of 10 data frame types. It is reflected in the go source code:
const ( http2FrameData http2FrameType = 0x0 http2FrameHeaders http2FrameType = 0x1 http2FramePriority http2FrameType = 0x2 http2FrameRSTStream http2FrameType = 0x3 http2FrameSettings http2FrameType = 0x4 http2FramePushPromise http2FrameType = 0x5 http2FramePing http2FrameType = 0x6 http2FrameGoAway http2FrameType = 0x7 http2FrameWindowUpdate http2FrameType = 0x8 http2FrameContinuation http2FrameType = 0x9 )
http2FrameData
: mainly used to send the request body and receive the response data frame.
http2FrameHeaders
: Mainly used to send request headers and receive data frames of response headers.
http2FrameSettings
: Mainly used for data frames related to client and server communication settings.
http2FrameWindowUpdate
: Data frame mainly used for flow control.
Other data frame types are not covered in this article, so they will not be described.
Since there are many types of data frame identifiers, the author only introduces some of them here. Let’s look at the source code first:
const ( // Data Frame http2FlagDataEndStream http2Flags = 0x1 // Headers Frame http2FlagHeadersEndStream http2Flags = 0x1 // Settings Frame http2FlagSettingsAck http2Flags = 0x1 // 此处省略定义其他数据帧标识符的代码 )
http2FlagDataEndStream
:在前篇中提到,调用(*http2ClientConn).newStream
方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream
的作用。
当client收到有响应body的响应时(HEAD请求无响应body,301,302等响应也无响应body),一直读到http2FrameData
数据帧的标识符为http2FlagDataEndStream
则意味着本次请求结束可以关闭当前数据流。
http2FlagHeadersEndStream
:如果读到的http2FrameHeaders
数据帧有此标识符也意味着本次请求结束。
http2FlagSettingsAck
:该标示符意味着对方确认收到http2FrameSettings
数据帧。
流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go中HTTP2通过http2flow
结构体进行流控制:
type http2flow struct { // n is the number of DATA bytes we're allowed to send. // A flow is kept both on a conn and a per-stream. n int32 // conn points to the shared connection-level flow that is // shared by all streams on that conn. It is nil for the flow // that's on the conn directly. conn *http2flow }
字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。
此方法返回当前流控制可发送的最大字节数:
func (f *http2flow) available() int32 { n := f.n if f.conn != nil && f.conn.n < n { n = f.conn.n } return n }
如果f.conn
为nil则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n
。
如果f.conn
不为nil则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。
此方法用于消耗当前流控制器的可发送字节数:
func (f *http2flow) take(n int32) { if n > f.available() { panic("internal error: took too much") } f.n -= n if f.conn != nil { f.conn.n -= n } }
通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic
,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n
。
有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:
func (f *http2flow) add(n int32) bool { sum := f.n + n if (sum > n) == (f.n > 0) { f.n = sum return true } return false }
上面的代码唯一需要注意的地方是,当sum超过int32正数最大值(2^31-1)时会返回false。
回顾:在前篇中提到的(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法均通过(*http2flow).add
初始化可发送数据窗口大小。
有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。
前篇分析(*http2Transport).newClientConn
时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop
开始。
func (cc *http2ClientConn) readLoop() { rl := &http2clientConnReadLoop{cc: cc} defer rl.cleanup() cc.readerErr = rl.run() if ce, ok := cc.readerErr.(http2ConnectionError); ok { cc.wmu.Lock() cc.fr.WriteGoAway(0, http2ErrCode(ce), nil) cc.wmu.Unlock() } }
由上可知,readLoop的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run
方法里。
func (rl *http2clientConnReadLoop) run() error { cc := rl.cc rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false // ever saw a HEADERS reply gotSettings := false for { f, err := cc.fr.ReadFrame() // 此处省略代码 maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *http2MetaHeadersFrame: err = rl.processHeaders(f) maybeIdle = true gotReply = true case *http2DataFrame: err = rl.processData(f) maybeIdle = true case *http2GoAwayFrame: err = rl.processGoAway(f) maybeIdle = true case *http2RSTStreamFrame: err = rl.processResetStream(f) maybeIdle = true case *http2SettingsFrame: err = rl.processSettings(f) case *http2PushPromiseFrame: err = rl.processPushPromise(f) case *http2WindowUpdateFrame: err = rl.processWindowUpdate(f) case *http2PingFrame: err = rl.processPing(f) default: cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { if http2VerboseLogs { cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err) } return err } if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } } }
由上可知,(*http2clientConnReadLoop).run
的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。
cc.fr.ReadFrame()
will read the data frame according to the data frame format introduced earlier.
In the previous article, it was mentioned that an image supporting the h2 protocol was used for analysis. This article continues to reuse the image to debug the (*http2clientConnReadLoop).run
method.
The reading loop will first read the http2FrameSettings
data frame. After reading the data frame, the (*http2clientConnReadLoop).processSettings
method will be called. (*http2clientConnReadLoop).processSettings
mainly contains 3 logics.
1. Determine whether it is the ack information of http2FrameSettings
. If so, return directly, otherwise continue with the following steps.
if f.IsAck() { if cc.wantSettingsAck { cc.wantSettingsAck = false return nil } return http2ConnectionError(http2ErrCodeProtocol) }
2、处理不同http2FrameSettings
的数据帧,并根据server传递的信息,修改maxConcurrentStreams
等的值。
err := f.ForeachSetting(func(s http2Setting) error { switch s.ID { case http2SettingMaxFrameSize: cc.maxFrameSize = s.Val case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val case http2SettingMaxHeaderListSize: cc.peerMaxHeaderListSize = uint64(s.Val) case http2SettingInitialWindowSize: if s.Val > math.MaxInt32 { return http2ConnectionError(http2ErrCodeFlowControl) } delta := int32(s.Val) - int32(cc.initialWindowSize) for _, cs := range cc.streams { cs.flow.add(delta) } cc.cond.Broadcast() cc.initialWindowSize = s.Val default: // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. cc.vlogf("Unhandled Setting: %v", s) } return nil })
当收到ID为http2SettingInitialWindowSize
的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize
(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)为s.Val
。
3、发送http2FrameSettings
的ack信息给server。
cc.wmu.Lock() defer cc.wmu.Unlock() cc.fr.WriteSettingsAck() cc.bw.Flush() return cc.werr
在笔者debug的过程中,处理完http2FrameSettings
数据帧后,紧接着就收到了http2WindowUpdateFrame
数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate
方法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, false) if f.StreamID != 0 && cs == nil { return nil } cc.mu.Lock() defer cc.mu.Unlock() fl := &cc.flow if cs != nil { fl = &cs.flow } if !fl.add(int32(f.Increment)) { return http2ConnectionError(http2ErrCodeFlowControl) } cc.cond.Broadcast() return nil }
上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果http2WindowUpdateFrame帧中的StreamID为0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。
注意:在debug的过程,收到http2WindowUpdateFrame
数据帧后,又收到一次http2FrameSettings
,且该数据帧标识符为http2FlagSettingsAck
。
笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn方法,也向server发送了http2FrameSettings数据帧和http2WindowUpdateFrame数据帧。
In addition, during the processing of http2FrameSettings
and http2WindowUpdateFrame
, there are cc.cond.Broadcast()
calls, which are mainly used to wake up because Wait
request in the following two situations:
The data stream processed by the current connection has reached the upper limit of maxConcurrentStreams
(see the previous article for details) Analysis of (*http2ClientConn).awaitOpenSlotForRequest
method).
Because the sending data stream has reached the upper limit of the sending data window, it is waiting for a request to update the sending data window (will be introduced later).
Receiving this data frame means that a request has started to receive response data. The processing function corresponding to this data frame is (*http2clientConnReadLoop).processHeaders
:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, false) // 此处省略代码 res, err := rl.handleResponse(cs, f) if err != nil { // 此处省略代码 cs.resc <- http2resAndError{err: err} return nil // return nil from process* funcs to keep conn alive } if res == nil { // (nil, nil) special case. See handleResponse docs. return nil } cs.resTrailer = &res.Trailer cs.resc <- http2resAndError{res: res} return nil }
首先我们先看cs.resc <- http2resAndError{res: res}
这一行代码,向数据流写入http2resAndError
即本次请求的响应。在(*http2ClientConn).roundTrip
方法中有这样一行代码readLoopResCh := cs.resc
。
回顾:前篇(*http2ClientConn).roundTrip
方法的第7点和本部分关联起来就可以形成一个完整的请求链。
接下来我们对rl.handleResponse
方法展开分析。
(*http2clientConnReadLoop).handleResponse
的主要作用是构建一个Response
变量,下面对该函数的关键步骤进行描述。
1、构建一个Response
变量。
header := make(Header) res := &Response{ Proto: "HTTP/2.0", ProtoMajor: 2, Header: header, StatusCode: statusCode, Status: status + " " + StatusText(statusCode), }
2、构建header(本篇不对header进行展开分析)。
for _, hf := range f.RegularFields() { key := CanonicalHeaderKey(hf.Name) if key == "Trailer" { t := res.Trailer if t == nil { t = make(Header) res.Trailer = t } http2foreachHeaderElement(hf.Value, func(v string) { t[CanonicalHeaderKey(v)] = nil }) } else { header[key] = append(header[key], hf.Value) } }
3、处理响应body的ContentLength。
streamEnded := f.StreamEnded() isHead := cs.req.Method == "HEAD" if !streamEnded || isHead { res.ContentLength = -1 if clens := res.Header["Content-Length"]; len(clens) == 1 { if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { res.ContentLength = clen64 } else { // TODO: care? unlike http/1, it won't mess up our framing, so it's // more safe smuggling-wise to ignore. } } else if len(clens) > 1 { // TODO: care? unlike http/1, it won't mess up our framing, so it's // more safe smuggling-wise to ignore. } }
由上可知,当前数据流没有结束或者是HEAD请求才读取ContentLength。如果header中的ContentLength不合法则res.ContentLength的值为 -1。
4、构建res.Body
。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}} cs.bytesRemain = res.ContentLength res.Body = http2transportResponseBody{cs} go cs.awaitRequestCancel(cs.req) if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") res.Header.Del("Content-Length") res.ContentLength = -1 res.Body = &http2gzipReader{body: res.Body} res.Uncompressed = true }
根据Content-Encoding
的编码方式,会构建两种不同的Body:
非gzip编码时,构造的res.Body类型为http2transportResponseBody
。
gzip编码时,构造的res.Body类型为http2gzipReader
。
收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData
。
因为server无法及时知道数据流在client端的状态,所以server可能会向client中一个已经不存在的数据流发送数据:
cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) data := f.Data() if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID cc.mu.Unlock() // 此处省略代码 if f.Length > 0 { cc.mu.Lock() cc.inflow.add(int32(f.Length)) cc.mu.Unlock() cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(f.Length)) cc.bw.Flush() cc.wmu.Unlock() } return nil }
接收到的数据帧在client没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length
,并且通过http2FrameWindowUpdate
数据帧告知server将当前连接的可写窗口大小增加f.Length
。
如果client有对应的数据流且f.Length
大于0:
1、如果是head请求结束当前数据流并返回。
if cs.req.Method == "HEAD" && len(data) > 0 { cc.logf("protocol error: received DATA on a HEAD request") rl.endStreamError(cs, http2StreamError{ StreamID: f.StreamID, Code: http2ErrCodeProtocol, }) return nil }
2、检查当前数据流能否处理f.Length
长度的数据。
cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { cs.inflow.take(int32(f.Length)) } else { cc.mu.Unlock() return http2ConnectionError(http2ErrCodeFlowControl) }
由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take
减小当前数据流可接受窗口大小。
3、当前数据流被重置或者被关闭即cs.didReset
为true时又或者数据帧有填充数据时需要调整流控制窗口。
var refund int if pad := int(f.Length) - len(data); pad > 0 { refund += pad } // Return len(data) now if the stream is already closed, // since data will never be read. didReset := cs.didReset if didReset { refund += len(data) } if refund > 0 { cc.inflow.add(int32(refund)) cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund)) if !didReset { cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock() } cc.mu.Unlock()
如果数据帧有填充数据则计算需要返还的填充数据长度。
如果数据流无效该数据帧的长度需要全部返还。
最后,根据计算的refund增加当前连接或者当前数据流的可接受窗口大小,并且同时告知server增加当前连接或者当前数据流的可写窗口大小。
4、数据长度大于0且数据流正常则将数据写入数据流缓冲区。
if len(data) > 0 && !didReset { if _, err := cs.bufPipe.Write(data); err != nil { rl.endStreamError(cs, err) return err } }
回顾:前面的(*http2clientConnReadLoop).handleResponse
方法中有这样一行代码res.Body = http2transportResponseBody{cs}
,所以在业务开发时能够通过Response读取到数据流中的缓冲数据。
在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。
因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。
1、读取响应数据后计算当前连接需要增加的可接受窗口大小。
cc.mu.Lock() defer cc.mu.Unlock() var connAdd, streamAdd int32 // Check the conn-level first, before the stream-level. if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { connAdd = http2transportDefaultConnFlow - v cc.inflow.add(connAdd) }
如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow
(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()
。
回顾:http2transportDefaultConnFlow
在前篇(*http2Transport).NewClientConn
方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame
数据帧告知server当前连接可发送窗口大小增加http2transportDefaultConnFlow
。
2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed. // Consider any buffered body data (read from the conn but not // consumed by the client) when computing flow control for this // stream. v := int(cs.inflow.available()) + cs.bufPipe.Len() if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh { streamAdd = int32(http2transportDefaultStreamFlow - v) cs.inflow.add(streamAdd) } }
如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh
(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v
。
回顾:http2transportDefaultStreamFlow
在前篇(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法中均有提到。
连接刚建立时,发送http2FrameSettings
数据帧,告知server每个数据流的可发送窗口大小为http2transportDefaultStreamFlow
。
在newStream
时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow
。
3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame
数据帧告知server。
if connAdd != 0 || streamAdd != 0 { cc.wmu.Lock() defer cc.wmu.Unlock() if connAdd != 0 { cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd)) } if streamAdd != 0 { cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd)) } cc.bw.Flush() }
以上就是server向client发送数据的流控制逻辑。
前篇中(*http2ClientConn).roundTrip
未对(*http2clientStream).writeRequestBody
进行分析,下面我们看看该方法的源码:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { cc := cs.cc sentEnd := false // whether we sent the final DATA frame w/ END_STREAM // 此处省略代码 req := cs.req hasTrailers := req.Trailer != nil remainLen := http2actualContentLength(req) hasContentLen := remainLen != -1 var sawEOF bool for !sawEOF { n, err := body.Read(buf[:len(buf)-1]) // 此处省略代码 remain := buf[:n] for len(remain) > 0 && err == nil { var allowed int32 allowed, err = cs.awaitFlowControl(len(remain)) switch { case err == http2errStopReqBodyWrite: return err case err == http2errStopReqBodyWriteAndCancel: cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) return err case err != nil: return err } cc.wmu.Lock() data := remain[:allowed] remain = remain[allowed:] sentEnd = sawEOF && len(remain) == 0 && !hasTrailers err = cc.fr.WriteData(cs.ID, sentEnd, data) if err == nil { err = cc.bw.Flush() } cc.wmu.Unlock() } if err != nil { return err } } // 此处省略代码 return err }
上面的逻辑可简单总结为:不停的读取请求body然后将读取的内容通过cc.fr.WriteData
转为http2FrameData
数据帧发送给server,直到请求body读完为止。其中和流控制有关的方法是awaitFlowControl
,下面我们对该方法进行分析。
此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { cc := cs.cc cc.mu.Lock() defer cc.mu.Unlock() for { if cc.closed { return 0, http2errClientConnClosed } if cs.stopReqBody != nil { return 0, cs.stopReqBody } if err := cs.checkResetOrDone(); err != nil { return 0, err } if a := cs.flow.available(); a > 0 { take := a if int(take) > maxBytes { take = int32(maxBytes) // can't truncate int; take is int32 } if take > int32(cc.maxFrameSize) { take = int32(cc.maxFrameSize) } cs.flow.take(take) return take, nil } cc.cond.Wait() } }
根据源码可以知道,数据流被关闭或者停止发送请求body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:
当前数据流可写窗口剩余可写数据大于0,则计算可写字节数,并将当前数据流可写窗口大小消耗take
。
当前数据流可写窗口剩余可写数据小于等于0,则会一直等待直到被唤醒并进入下一次检查。
上面的第二种情况在收到http2WindowUpdateFrame数据帧这一节中提到过。
After the server reads the data of the current data stream, it will send a http2WindowUpdateFrame
data frame to the client's corresponding data stream. After the client receives the data frame, it will increase the writable window of the corresponding data stream and execute cc.cond.Broadcast()
Wake up the data stream that is waiting because the data sent has reached the flow control upper limit and continue to send data.
The above is the flow control logic for client to send data to server.
The frame header is 9 bytes long and contains four parts: Payload length, frame type, frame identifier and data stream ID.
Flow control can be divided into two steps:
Initially, through http2FrameSettings
The data frame and http2WindowUpdateFrame
data frame inform the other party of the size of the current connection read-write window and the size of the read-write window of the data stream in the connection.
During the process of reading and writing data, the write window size of the other end is controlled by sending the http2WindowUpdateFrame
data frame.
The above is the detailed content of Go initiates HTTP2.0 request process analysis (Part 2)—Data frame & flow control. For more information, please follow other related articles on the PHP Chinese website!