首頁 > 後端開發 > Golang > 主體

Go發起HTTP2.0請求流程分析(中篇)-資料幀&流控制

發布: 2023-07-21 16:10:19
轉載
689 人瀏覽過
資料幀

HTTP2通訊的最小單位是資料幀,每個幀都包含兩個部分:幀頭Payload。不同資料流的幀可以交錯發送(同一個資料流的幀必須順序發送),然後再根據每個幀頭的資料流標識符重新組裝。

由於Payload中為有效數據,故僅對幀頭進行分析描述。

幀頭

幀頭總長度為9個位元組,並包含四個部分,分別是:

  1. Payload的長度,佔用三個位元組。

  2. 資料幀類型,佔用一個位元組。

  3. 資料幀標識符,佔用一個位元組。

  4. 資料流ID,佔用四個位元組。

用圖表示如下:

Go發起HTTP2.0請求流程分析(中篇)-資料幀&流控制

資料幀的格式和各部分的含義已經清楚了, 那麼我們看看程式碼中怎麼讀取一個幀頭:

func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
	_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
if err != nil {
return http2FrameHeader{}, err
	}
return http2FrameHeader{
		Length:   (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
		Type:     http2FrameType(buf[3]),
		Flags:    http2Flags(buf[4]),
		StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
		valid:    true,
	}, nil
}
登入後複製

在上面的程式碼中http2frameHeaderLen是一個常數,其值為9。

從io.Reader讀取9個位元組後,將前三個位元組和後四個位元組均轉為uint32的類型,從而得到Payload長度和數據流ID。另外要理解的是幀頭的前三個位元組和後四個位元組儲存格式為大端(大小端筆者就不在這裡解釋了,請尚不了解的讀者自行百度)。

資料幀類型

根據http://http2.github.io/http2-spec/#rfc.section.11.2描述,資料幀類型總共有10個。在go源碼中均有體現:

const (
	http2FrameData         http2FrameType = 0x0
	http2FrameHeaders      http2FrameType = 0x1
	http2FramePriority     http2FrameType = 0x2
	http2FrameRSTStream    http2FrameType = 0x3
	http2FrameSettings     http2FrameType = 0x4
	http2FramePushPromise  http2FrameType = 0x5
	http2FramePing         http2FrameType = 0x6
	http2FrameGoAway       http2FrameType = 0x7
	http2FrameWindowUpdate http2FrameType = 0x8
	http2FrameContinuation http2FrameType = 0x9
)
登入後複製

http2FrameData:主要用於傳送請求body和接收回應的資料幀。

http2FrameHeaders:主要用於傳送請求header和接收回應header的資料幀。

http2FrameSettings:主要用於client和server交流設定相關的資料幀。

http2FrameWindowUpdate:主要用於串流控制的資料幀。

其他資料幀類型因為本文不涉及,故不做描述。

資料幀標識符

由於資料幀標識符種類較多,筆者在這裡僅介紹其中部分標識符,先看源碼:

const (
// Data Frame
	http2FlagDataEndStream http2Flags = 0x1

// Headers Frame
	http2FlagHeadersEndStream  http2Flags = 0x1

// Settings Frame
	http2FlagSettingsAck http2Flags = 0x1
// 此处省略定义其他数据帧标识符的代码
)
登入後複製

http2FlagDataEndStream:在前篇中提到,调用(*http2ClientConn).newStream方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream的作用。

当client收到有响应body的响应时(HEAD请求无响应body,301,302等响应也无响应body),一直读到http2FrameData数据帧的标识符为http2FlagDataEndStream则意味着本次请求结束可以关闭当前数据流。

http2FlagHeadersEndStream:如果读到的http2FrameHeaders数据帧有此标识符也意味着本次请求结束。

http2FlagSettingsAck:该标示符意味着对方确认收到http2FrameSettings数据帧。

流控制器

流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go中HTTP2通过http2flow结构体进行流控制:

type http2flow struct {
// n is the number of DATA bytes we&#39;re allowed to send.
// A flow is kept both on a conn and a per-stream.
	n int32

// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// that&#39;s on the conn directly.
	conn *http2flow
}
登入後複製

字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。

(*http2flow).available

此方法返回当前流控制可发送的最大字节数:

func (f *http2flow) available() int32 {
	n := f.n
if f.conn != nil && f.conn.n < n {
		n = f.conn.n
	}
return n
}
登入後複製
  • 如果f.conn为nil则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n

  • 如果f.conn不为nil则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。

(*http2flow).take

此方法用于消耗当前流控制器的可发送字节数:

func (f *http2flow) take(n int32) {
if n > f.available() {
panic("internal error: took too much")
	}
	f.n -= n
if f.conn != nil {
		f.conn.n -= n
	}
}
登入後複製

通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n

(*http2flow).add

有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:

func (f *http2flow) add(n int32) bool {
	sum := f.n + n
if (sum > n) == (f.n > 0) {
		f.n = sum
return true
	}
return false
}
登入後複製

上面的代码唯一需要注意的地方是,当sum超过int32正数最大值(2^31-1)时会返回false。

回顾:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通过(*http2flow).add初始化可发送数据窗口大小。

有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。

(*http2ClientConn).readLoop

前篇分析(*http2Transport).newClientConn时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop开始。

func (cc *http2ClientConn) readLoop() {
	rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
	cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {
		cc.wmu.Lock()
		cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
		cc.wmu.Unlock()
	}
}
登入後複製

由上可知,readLoop的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run方法里。

func (rl *http2clientConnReadLoop) run() error {
	cc := rl.cc
	rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
	gotReply := false // ever saw a HEADERS reply
	gotSettings := false
for {
		f, err := cc.fr.ReadFrame()
// 此处省略代码
		maybeIdle := false // whether frame might transition us to idle

switch f := f.(type) {
case *http2MetaHeadersFrame:
			err = rl.processHeaders(f)
			maybeIdle = true
			gotReply = true
case *http2DataFrame:
			err = rl.processData(f)
			maybeIdle = true
case *http2GoAwayFrame:
			err = rl.processGoAway(f)
			maybeIdle = true
case *http2RSTStreamFrame:
			err = rl.processResetStream(f)
			maybeIdle = true
case *http2SettingsFrame:
			err = rl.processSettings(f)
case *http2PushPromiseFrame:
			err = rl.processPushPromise(f)
case *http2WindowUpdateFrame:
			err = rl.processWindowUpdate(f)
case *http2PingFrame:
			err = rl.processPing(f)
default:
			cc.logf("Transport: unhandled response frame type %T", f)
		}
if err != nil {
if http2VerboseLogs {
				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
			}
return err
		}
if rl.closeWhenIdle && gotReply && maybeIdle {
			cc.closeIfIdle()
		}
	}
}
登入後複製

由上可知,(*http2clientConnReadLoop).run的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。

cc.fr.ReadFrame()會根據前面介紹的資料幀格式讀出資料幀。

前篇提到使用了一個支援h2協定的圖片進行分析,本篇繼續重複使用該圖片對(*http2clientConnReadLoop).run方法進行debug。

收到http2FrameSettings資料幀

讀取循環會最先讀到http2FrameSettings資料幀。讀到該資料幀後會呼叫(*http2clientConnReadLoop).processSettings方法。 (*http2clientConnReadLoop).processSettings主要包含3個邏輯。

1、判斷是否為http2FrameSettings的ack訊息,如果是直接返回,否則繼續後面的步驟。

if f.IsAck() {
if cc.wantSettingsAck {
    cc.wantSettingsAck = false
return nil
  }
return http2ConnectionError(http2ErrCodeProtocol)
}
登入後複製

2、处理不同http2FrameSettings的数据帧,并根据server传递的信息,修改maxConcurrentStreams等的值。

err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
    cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
    cc.maxConcurrentStreams = s.Val
case http2SettingMaxHeaderListSize:
    cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
    }
    delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
      cs.flow.add(delta)
    }
    cc.cond.Broadcast()
    cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
    cc.vlogf("Unhandled Setting: %v", s)
  }
return nil
})
登入後複製

当收到ID为http2SettingInitialWindowSize的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)s.Val

3、发送http2FrameSettings的ack信息给server。

	cc.wmu.Lock()
defer cc.wmu.Unlock()

	cc.fr.WriteSettingsAck()
	cc.bw.Flush()
return cc.werr
登入後複製

收到http2WindowUpdateFrame数据帧

在笔者debug的过程中,处理完http2FrameSettings数据帧后,紧接着就收到了http2WindowUpdateFrame数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate方法:

func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
	cc := rl.cc
	cs := cc.streamByID(f.StreamID, false)
if f.StreamID != 0 && cs == nil {
return nil
	}

	cc.mu.Lock()
defer cc.mu.Unlock()

	fl := &cc.flow
if cs != nil {
		fl = &cs.flow
	}
if !fl.add(int32(f.Increment)) {
return http2ConnectionError(http2ErrCodeFlowControl)
	}
	cc.cond.Broadcast()
return nil
}
登入後複製

上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果http2WindowUpdateFrame帧中的StreamID为0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。

注意:在debug的过程,收到http2WindowUpdateFrame数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck

笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn方法,也向server发送了http2FrameSettings数据帧和http2WindowUpdateFrame数据帧。

另外,在處理http2FrameSettingshttp2WindowUpdateFrame過程中,均出現了cc.cond.Broadcast()調用,該調用主要用於喚醒因為以下兩種情況而Wait的請求:

  1. 因目前連線處理的資料流已達到maxConcurrentStreams的上限(詳見前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。

  2. 因傳送資料流已達可傳送資料視窗上限而等待可傳送資料視窗更新的請求(後續會介紹)。

收到http2MetaHeadersFrame資料幀

收到此資料幀表示某一個請求已經開始接收回應資料。此資料幀對應的處理函數為(*http2clientConnReadLoop).processHeaders

func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
	cc := rl.cc
	cs := cc.streamByID(f.StreamID, false)
// 此处省略代码
	res, err := rl.handleResponse(cs, f)
if err != nil {
// 此处省略代码
		cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
	}
if res == nil {
// (nil, nil) special case. See handleResponse docs.
return nil
	}
	cs.resTrailer = &res.Trailer
	cs.resc <- http2resAndError{res: res}
return nil
}
登入後複製

首先我们先看cs.resc <- http2resAndError{res: res}这一行代码,向数据流写入http2resAndError即本次请求的响应。在(*http2ClientConn).roundTrip方法中有这样一行代码readLoopResCh := cs.resc

回顾:前篇(*http2ClientConn).roundTrip方法的第7点和本部分关联起来就可以形成一个完整的请求链。

接下来我们对rl.handleResponse方法展开分析。

(*http2clientConnReadLoop).handleResponse

(*http2clientConnReadLoop).handleResponse的主要作用是构建一个Response变量,下面对该函数的关键步骤进行描述。

1、构建一个Response变量。

header := make(Header)
res := &Response{
  Proto:      "HTTP/2.0",
  ProtoMajor: 2,
  Header:     header,
  StatusCode: statusCode,
  Status:     status + " " + StatusText(statusCode),
}
登入後複製

2、构建header(本篇不对header进行展开分析)。

for _, hf := range f.RegularFields() {
  key := CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
    t := res.Trailer
if t == nil {
      t = make(Header)
      res.Trailer = t
    }
    http2foreachHeaderElement(hf.Value, func(v string) {
      t[CanonicalHeaderKey(v)] = nil
    })
  } else {
    header[key] = append(header[key], hf.Value)
  }
}
登入後複製

3、处理响应body的ContentLength。

streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
  res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
      res.ContentLength = clen64
    } else {
// TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
// more safe smuggling-wise to ignore.
    }
  } else if len(clens) > 1 {
// TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
// more safe smuggling-wise to ignore.
  }
}
登入後複製

由上可知,当前数据流没有结束或者是HEAD请求才读取ContentLength。如果header中的ContentLength不合法则res.ContentLength的值为 -1

4、构建res.Body

cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)

if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
  res.Header.Del("Content-Encoding")
  res.Header.Del("Content-Length")
  res.ContentLength = -1
  res.Body = &http2gzipReader{body: res.Body}
  res.Uncompressed = true
}
登入後複製

根据Content-Encoding的编码方式,会构建两种不同的Body:

  1. 非gzip编码时,构造的res.Body类型为http2transportResponseBody

  2. gzip编码时,构造的res.Body类型为http2gzipReader

收到http2DataFrame数据帧

收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData

因为server无法及时知道数据流在client端的状态,所以server可能会向client中一个已经不存在的数据流发送数据:

cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {
  cc.mu.Lock()
  neverSent := cc.nextStreamID
  cc.mu.Unlock()
// 此处省略代码
if f.Length > 0 {
    cc.mu.Lock()
    cc.inflow.add(int32(f.Length))
    cc.mu.Unlock()

    cc.wmu.Lock()
    cc.fr.WriteWindowUpdate(0, uint32(f.Length))
    cc.bw.Flush()
    cc.wmu.Unlock()
  }
return nil
}
登入後複製

接收到的数据帧在client没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length,并且通过http2FrameWindowUpdate数据帧告知server将当前连接的可写窗口大小增加f.Length

如果client有对应的数据流且f.Length大于0:

1、如果是head请求结束当前数据流并返回。

if cs.req.Method == "HEAD" && len(data) > 0 {
  cc.logf("protocol error: received DATA on a HEAD request")
  rl.endStreamError(cs, http2StreamError{
    StreamID: f.StreamID,
    Code:     http2ErrCodeProtocol,
  })
return nil
}
登入後複製

2、检查当前数据流能否处理f.Length长度的数据。

cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
  cs.inflow.take(int32(f.Length))
} else {
  cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
登入後複製

由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take减小当前数据流可接受窗口大小。

3、当前数据流被重置或者被关闭即cs.didReset为true时又或者数据帧有填充数据时需要调整流控制窗口。

var refund int
if pad := int(f.Length) - len(data); pad > 0 {
  refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
  refund += len(data)
}
if refund > 0 {
  cc.inflow.add(int32(refund))
  cc.wmu.Lock()
  cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
    cs.inflow.add(int32(refund))
    cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
  }
  cc.bw.Flush()
  cc.wmu.Unlock()
}
cc.mu.Unlock()
登入後複製
  • 如果数据帧有填充数据则计算需要返还的填充数据长度。

  • 如果数据流无效该数据帧的长度需要全部返还。

最后,根据计算的refund增加当前连接或者当前数据流的可接受窗口大小,并且同时告知server增加当前连接或者当前数据流的可写窗口大小。

4、数据长度大于0且数据流正常则将数据写入数据流缓冲区。

if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
    rl.endStreamError(cs, err)
return err
  }
}
登入後複製

回顾:前面的(*http2clientConnReadLoop).handleResponse方法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时能够通过Response读取到数据流中的缓冲数据。

(http2transportResponseBody).Read

在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。

因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。

1、读取响应数据后计算当前连接需要增加的可接受窗口大小。

cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
  connAdd = http2transportDefaultConnFlow - v
  cc.inflow.add(connAdd)
}
登入後複製

如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()

回顾http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame数据帧告知server当前连接可发送窗口大小增加http2transportDefaultConnFlow

2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。

if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
  v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
    streamAdd = int32(http2transportDefaultStreamFlow - v)
    cs.inflow.add(streamAdd)
  }
}
登入後複製

如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v

回顾http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。

连接刚建立时,发送http2FrameSettings数据帧,告知server每个数据流的可发送窗口大小为http2transportDefaultStreamFlow

newStream时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow

3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame数据帧告知server。

if connAdd != 0 || streamAdd != 0 {
  cc.wmu.Lock()
defer cc.wmu.Unlock()
if connAdd != 0 {
    cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
  }
if streamAdd != 0 {
    cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
  }
  cc.bw.Flush()
}
登入後複製

以上就是server向client发送数据的流控制逻辑。

(*http2clientStream).writeRequestBody

前篇中(*http2ClientConn).roundTrip未对(*http2clientStream).writeRequestBody进行分析,下面我们看看该方法的源码:

func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
	cc := cs.cc
	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
// 此处省略代码
	req := cs.req
	hasTrailers := req.Trailer != nil
	remainLen := http2actualContentLength(req)
	hasContentLen := remainLen != -1

var sawEOF bool
for !sawEOF {
		n, err := body.Read(buf[:len(buf)-1])
// 此处省略代码
		remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
			allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == http2errStopReqBodyWrite:
return err
case err == http2errStopReqBodyWriteAndCancel:
				cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return err
case err != nil:
return err
			}
			cc.wmu.Lock()
			data := remain[:allowed]
			remain = remain[allowed:]
			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
			err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
				err = cc.bw.Flush()
			}
			cc.wmu.Unlock()
		}
if err != nil {
return err
		}
	}
// 此处省略代码
return err
}
登入後複製

上面的逻辑可简单总结为:不停的读取请求body然后将读取的内容通过cc.fr.WriteData转为http2FrameData数据帧发送给server,直到请求body读完为止。其中和流控制有关的方法是awaitFlowControl,下面我们对该方法进行分析。

(*http2clientStream).awaitFlowControl

此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。

func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
	cc := cs.cc
	cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
		}
if cs.stopReqBody != nil {
return 0, cs.stopReqBody
		}
if err := cs.checkResetOrDone(); err != nil {
return 0, err
		}
if a := cs.flow.available(); a > 0 {
			take := a
if int(take) > maxBytes {

				take = int32(maxBytes) // can&#39;t truncate int; take is int32
			}
if take > int32(cc.maxFrameSize) {
				take = int32(cc.maxFrameSize)
			}
			cs.flow.take(take)
return take, nil
		}
		cc.cond.Wait()
	}
}
登入後複製

根据源码可以知道,数据流被关闭或者停止发送请求body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:

  1. 当前数据流可写窗口剩余可写数据大于0,则计算可写字节数,并将当前数据流可写窗口大小消耗take

  2. 当前数据流可写窗口剩余可写数据小于等于0,则会一直等待直到被唤醒并进入下一次检查。

上面的第二种情况在收到http2WindowUpdateFrame数据帧这一节中提到过。

server讀取目前資料流的資料後會向client對應資料流發送http2WindowUpdateFrame資料幀,client收到該資料幀後會增大對應資料流可寫窗口,並執行cc.cond.Broadcast()喚醒因傳送資料已達流控制上限而等待的資料流繼續傳送資料。

以上就是client傳送資料給server的流控制邏輯。

總結

  1. 幀頭長度為9個位元組,並包含四個部分:Payload的長度、幀類型、幀標識符和資料流ID。

  2. 流控制可分為兩個步驟:

  • #初始時,透過http2FrameSettings資料幀和http2WindowUpdateFrame資料幀告知對方目前連接讀寫視窗大小以及連接中資料流讀寫視窗大小。

  • 在讀取和寫入資料過程中,透過傳送http2WindowUpdateFrame資料幀控制另一端的寫入視窗大小。

以上是Go發起HTTP2.0請求流程分析(中篇)-資料幀&流控制的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:Go语言进阶学习
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板