Go が HTTP2.0 リクエスト プロセス分析を開始 (パート 2) - データ フレームとフロー制御

リリース: 2023-07-24 15:28:35
転載
1251 人が閲覧しました

データ フレーム

HTTP2 通信の最小単位はデータ フレームであり、各フレームには 2 つの部分が含まれます: Frameヘッダー ペイロード 。異なるデータ ストリームからのフレームはインターリーブして送信でき (同じデータ ストリームからのフレームは連続して送信する必要があります)、各フレーム ヘッダーのデータ ストリーム識別子に基づいて再組み立てされます。

ペイロードには有効なデータが含まれているため、フレームヘッダーのみを解析して記述します。

フレーム ヘッダー

フレーム ヘッダーの全長は 9 バイトで、次の 4 つの部分が含まれます。

  1. #ペイロードの長さは 3 バイト必要です。

  2. データ フレーム タイプ。1 バイトを占有します。

  3. データ フレーム識別子。1 バイトを占めます。

  4. データ ストリーム ID。4 バイトを占めます。 ############回路図は以下の通りです:###

    Go が HTTP2.0 リクエスト プロセス分析を開始 (パート 2) - データ フレームとフロー制御

    データ フレームの形式と各部分の意味はすでに明らかなので、コード内のフレーム ヘッダーを読み取る方法を見てみましょう。上記のコード

    http2frameHeaderLen

    は、値 9 の定数です。 io.Reader から 9 バイトを読み取った後、最初の 3 バイトと最後の 4 バイトを

    uint32

    型に変換して、ペイロード長とデータ ストリーム ID を取得します。もう 1 つ理解する必要があるのは、フレーム ヘッダーの最初の 3 バイトと最後の 4 バイトがビッグ エンディアン形式で格納されていることです (ビッグ エンディアンとスモール エンディアンについてはここでは説明しません。理解できない読者は Baidu 自身に問い合わせてください) )。 データ フレーム タイプ

    http://http2.github.io/http2-spec/#rfc.section.11.2 の説明によると、合計 10 種類のデータ フレーム タイプがあります。 。これは go のソース コードに反映されています:
    func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
    	_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
    if err != nil {
    return http2FrameHeader{}, err
    	}
    return http2FrameHeader{
    		Length:   (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
    		Type:     http2FrameType(buf[3]),
    		Flags:    http2Flags(buf[4]),
    		StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
    		valid:    true,
    	}, nil
    }
    ログイン後にコピー

    http2FrameData

    : 主にリクエスト本文の送信とレスポンス データ フレームの受信に使用されます。 <p data-source-line="77" style="box-sizing: border-box;margin-bottom: 16px;caret-color: rgb(36, 41, 46);color: rgb(36, 41, 46);font-family: -apple-system, BlinkMacSystemFont, 微软雅黑, "PingFang SC", Helvetica, Arial, "Hiragino Sans GB", "Microsoft YaHei", SimSun, 宋体, Heiti, 黑体, sans-serif;font-size: 14px;text-align: start;white-space: normal;text-size-adjust: auto;"><code style="box-sizing: border-box;font-family: SFMono-Regular, Consolas, "Liberation Mono", Menlo, Courier, monospace;font-size: 11.9px;padding: 0.2em 0.4em;background-color: rgba(27, 31, 35, 0.05);border-radius: 3px;">http2FrameHeaders: 主にリクエスト ヘッダーの送信とレスポンス ヘッダーのデータ フレームの受信に使用されます。

    http2FrameSettings: 主にクライアントとサーバーの通信設定に関連するデータ フレームに使用されます。

    http2FrameWindowUpdate: 主にフロー制御に使用されるデータ フレーム。

    他のデータ フレーム タイプについてはこの記事では取り上げないため、説明を省略します。

    データ フレーム識別子

    データ フレーム識別子には多くの種類があるため、ここでは一部のみを紹介します。まずソース コードを見てみましょう:

    const (
    // Data Frame
    	http2FlagDataEndStream http2Flags = 0x1
    
    // Headers Frame
    	http2FlagHeadersEndStream  http2Flags = 0x1
    
    // Settings Frame
    	http2FlagSettingsAck http2Flags = 0x1
    // 此处省略定义其他数据帧标识符的代码
    )
    ログイン後にコピー

    http2FlagDataEndStream:在前篇中提到,调用(*http2ClientConn).newStream方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream的作用。

    当client收到有响应body的响应时(HEAD请求无响应body,301,302等响应也无响应body),一直读到http2FrameData数据帧的标识符为http2FlagDataEndStream则意味着本次请求结束可以关闭当前数据流。

    http2FlagHeadersEndStream:如果读到的http2FrameHeaders数据帧有此标识符也意味着本次请求结束。

    http2FlagSettingsAck:该标示符意味着对方确认收到http2FrameSettings数据帧。

    流控制器

    流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go中HTTP2通过http2flow结构体进行流控制:

    type http2flow struct {
    // n is the number of DATA bytes we&#39;re allowed to send.
    // A flow is kept both on a conn and a per-stream.
    	n int32
    
    // conn points to the shared connection-level flow that is
    // shared by all streams on that conn. It is nil for the flow
    // that&#39;s on the conn directly.
    	conn *http2flow
    }
    ログイン後にコピー

    字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。

    (*http2flow).available

    此方法返回当前流控制可发送的最大字节数:

    func (f *http2flow) available() int32 {
    	n := f.n
    if f.conn != nil && f.conn.n < n {
    		n = f.conn.n
    	}
    return n
    }
    ログイン後にコピー
    • 如果f.conn为nil则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n

    • 如果f.conn不为nil则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。

    (*http2flow).take

    此方法用于消耗当前流控制器的可发送字节数:

    func (f *http2flow) take(n int32) {
    if n > f.available() {
    panic("internal error: took too much")
    	}
    	f.n -= n
    if f.conn != nil {
    		f.conn.n -= n
    	}
    }
    ログイン後にコピー

    通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n

    (*http2flow).add

    有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:

    func (f *http2flow) add(n int32) bool {
    	sum := f.n + n
    if (sum > n) == (f.n > 0) {
    		f.n = sum
    return true
    	}
    return false
    }
    ログイン後にコピー

    上面的代码唯一需要注意的地方是,当sum超过int32正数最大值(2^31-1)时会返回false。

    回顾:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通过(*http2flow).add初始化可发送数据窗口大小。

    有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。

    (*http2ClientConn).readLoop

    前篇分析(*http2Transport).newClientConn时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop开始。

    func (cc *http2ClientConn) readLoop() {
    	rl := &http2clientConnReadLoop{cc: cc}
    defer rl.cleanup()
    	cc.readerErr = rl.run()
    if ce, ok := cc.readerErr.(http2ConnectionError); ok {
    		cc.wmu.Lock()
    		cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
    		cc.wmu.Unlock()
    	}
    }
    ログイン後にコピー

    由上可知,readLoop的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run方法里。

    func (rl *http2clientConnReadLoop) run() error {
    	cc := rl.cc
    	rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
    	gotReply := false // ever saw a HEADERS reply
    	gotSettings := false
    for {
    		f, err := cc.fr.ReadFrame()
    // 此处省略代码
    		maybeIdle := false // whether frame might transition us to idle
    
    switch f := f.(type) {
    case *http2MetaHeadersFrame:
    			err = rl.processHeaders(f)
    			maybeIdle = true
    			gotReply = true
    case *http2DataFrame:
    			err = rl.processData(f)
    			maybeIdle = true
    case *http2GoAwayFrame:
    			err = rl.processGoAway(f)
    			maybeIdle = true
    case *http2RSTStreamFrame:
    			err = rl.processResetStream(f)
    			maybeIdle = true
    case *http2SettingsFrame:
    			err = rl.processSettings(f)
    case *http2PushPromiseFrame:
    			err = rl.processPushPromise(f)
    case *http2WindowUpdateFrame:
    			err = rl.processWindowUpdate(f)
    case *http2PingFrame:
    			err = rl.processPing(f)
    default:
    			cc.logf("Transport: unhandled response frame type %T", f)
    		}
    if err != nil {
    if http2VerboseLogs {
    				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
    			}
    return err
    		}
    if rl.closeWhenIdle && gotReply && maybeIdle {
    			cc.closeIfIdle()
    		}
    	}
    }
    ログイン後にコピー

    由上可知,(*http2clientConnReadLoop).run的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。

    cc.fr.ReadFrame() は、前に紹介したデータ フレーム形式に従ってデータ フレームを読み取ります。

    前の記事では、h2 プロトコルをサポートするイメージが分析に使用されたと述べましたが、この記事では引き続きイメージを再利用して (*http2clientConnReadLoop).run メソッドをデバッグします。

    受信した http2FrameSettings データ フレーム

    読み取りループは、最初に http2FrameSettings データ フレームを読み取ります。データ フレームを読み取った後、(*http2clientConnReadLoop).processSettings メソッドが呼び出されます。 (*http2clientConnReadLoop).processSettingsには主に 3 つのロジックが含まれています。

    1. http2FrameSettings の ack 情報であるかどうかを確認し、そうである場合はそのまま戻り、そうでない場合は次の手順に進みます。

    if f.IsAck() {
    if cc.wantSettingsAck {
        cc.wantSettingsAck = false
    return nil
      }
    return http2ConnectionError(http2ErrCodeProtocol)
    }
    ログイン後にコピー

    2、处理不同http2FrameSettings的数据帧,并根据server传递的信息,修改maxConcurrentStreams等的值。

    err := f.ForeachSetting(func(s http2Setting) error {
    switch s.ID {
    case http2SettingMaxFrameSize:
        cc.maxFrameSize = s.Val
    case http2SettingMaxConcurrentStreams:
        cc.maxConcurrentStreams = s.Val
    case http2SettingMaxHeaderListSize:
        cc.peerMaxHeaderListSize = uint64(s.Val)
    case http2SettingInitialWindowSize:
    if s.Val > math.MaxInt32 {
    return http2ConnectionError(http2ErrCodeFlowControl)
        }
        delta := int32(s.Val) - int32(cc.initialWindowSize)
    for _, cs := range cc.streams {
          cs.flow.add(delta)
        }
        cc.cond.Broadcast()
        cc.initialWindowSize = s.Val
    default:
    // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
        cc.vlogf("Unhandled Setting: %v", s)
      }
    return nil
    })
    ログイン後にコピー

    当收到ID为http2SettingInitialWindowSize的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)s.Val

    3、发送http2FrameSettings的ack信息给server。

    	cc.wmu.Lock()
    defer cc.wmu.Unlock()
    
    	cc.fr.WriteSettingsAck()
    	cc.bw.Flush()
    return cc.werr
    ログイン後にコピー

    收到http2WindowUpdateFrame数据帧

    在笔者debug的过程中,处理完http2FrameSettings数据帧后,紧接着就收到了http2WindowUpdateFrame数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate方法:

    func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
    	cc := rl.cc
    	cs := cc.streamByID(f.StreamID, false)
    if f.StreamID != 0 && cs == nil {
    return nil
    	}
    
    	cc.mu.Lock()
    defer cc.mu.Unlock()
    
    	fl := &cc.flow
    if cs != nil {
    		fl = &cs.flow
    	}
    if !fl.add(int32(f.Increment)) {
    return http2ConnectionError(http2ErrCodeFlowControl)
    	}
    	cc.cond.Broadcast()
    return nil
    }
    ログイン後にコピー

    上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果http2WindowUpdateFrame帧中的StreamID为0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。

    注意:在debug的过程,收到http2WindowUpdateFrame数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck

    笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn方法,也向server发送了http2FrameSettings数据帧和http2WindowUpdateFrame数据帧。

    さらに、http2FrameSettings および http2WindowUpdateFrame の処理中に、主にウェイクアップするために使用される cc.cond.Broadcast() 呼び出しが行われます。 Wait リクエストは次の 2 つの状況で発生するためです:

    1. 現在の接続によって処理されるデータ ストリームが maxConcurrentStreams の上限に達しました (「詳細は前回の記事 (*http2ClientConn).awaitOpenSlotForRequestメソッドの解析)。

    2. 送信データストリームが送信データウィンドウの上限に達したため、送信データウィンドウの更新要求を待っています(後述)。

    http2MetaHeadersFrame データ フレームの受信

    このデータ フレームの受信は、リクエストが応答データの受信を開始したことを意味します。このデータフレームに対応する処理関数は (*http2clientConnReadLoop).processHeaders:

    です。
    func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
    	cc := rl.cc
    	cs := cc.streamByID(f.StreamID, false)
    // 此处省略代码
    	res, err := rl.handleResponse(cs, f)
    if err != nil {
    // 此处省略代码
    		cs.resc <- http2resAndError{err: err}
    return nil // return nil from process* funcs to keep conn alive
    	}
    if res == nil {
    // (nil, nil) special case. See handleResponse docs.
    return nil
    	}
    	cs.resTrailer = &res.Trailer
    	cs.resc <- http2resAndError{res: res}
    return nil
    }
    ログイン後にコピー

    首先我们先看cs.resc <- http2resAndError{res: res}这一行代码,向数据流写入http2resAndError即本次请求的响应。在(*http2ClientConn).roundTrip方法中有这样一行代码readLoopResCh := cs.resc

    回顾:前篇(*http2ClientConn).roundTrip方法的第7点和本部分关联起来就可以形成一个完整的请求链。

    接下来我们对rl.handleResponse方法展开分析。

    (*http2clientConnReadLoop).handleResponse

    (*http2clientConnReadLoop).handleResponse的主要作用是构建一个Response变量,下面对该函数的关键步骤进行描述。

    1、构建一个Response变量。

    header := make(Header)
    res := &Response{
      Proto:      "HTTP/2.0",
      ProtoMajor: 2,
      Header:     header,
      StatusCode: statusCode,
      Status:     status + " " + StatusText(statusCode),
    }
    ログイン後にコピー

    2、构建header(本篇不对header进行展开分析)。

    for _, hf := range f.RegularFields() {
      key := CanonicalHeaderKey(hf.Name)
    if key == "Trailer" {
        t := res.Trailer
    if t == nil {
          t = make(Header)
          res.Trailer = t
        }
        http2foreachHeaderElement(hf.Value, func(v string) {
          t[CanonicalHeaderKey(v)] = nil
        })
      } else {
        header[key] = append(header[key], hf.Value)
      }
    }
    ログイン後にコピー

    3、处理响应body的ContentLength。

    streamEnded := f.StreamEnded()
    isHead := cs.req.Method == "HEAD"
    if !streamEnded || isHead {
      res.ContentLength = -1
    if clens := res.Header["Content-Length"]; len(clens) == 1 {
    if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
          res.ContentLength = clen64
        } else {
    // TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
    // more safe smuggling-wise to ignore.
        }
      } else if len(clens) > 1 {
    // TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
    // more safe smuggling-wise to ignore.
      }
    }
    ログイン後にコピー

    由上可知,当前数据流没有结束或者是HEAD请求才读取ContentLength。如果header中的ContentLength不合法则res.ContentLength的值为 -1

    4、构建res.Body

    cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
    cs.bytesRemain = res.ContentLength
    res.Body = http2transportResponseBody{cs}
    go cs.awaitRequestCancel(cs.req)
    
    if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
      res.Header.Del("Content-Encoding")
      res.Header.Del("Content-Length")
      res.ContentLength = -1
      res.Body = &http2gzipReader{body: res.Body}
      res.Uncompressed = true
    }
    ログイン後にコピー

    根据Content-Encoding的编码方式,会构建两种不同的Body:

    1. 非gzip编码时,构造的res.Body类型为http2transportResponseBody

    2. gzip编码时,构造的res.Body类型为http2gzipReader

    收到http2DataFrame数据帧

    收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData

    因为server无法及时知道数据流在client端的状态,所以server可能会向client中一个已经不存在的数据流发送数据:

    cc := rl.cc
    cs := cc.streamByID(f.StreamID, f.StreamEnded())
    data := f.Data()
    if cs == nil {
      cc.mu.Lock()
      neverSent := cc.nextStreamID
      cc.mu.Unlock()
    // 此处省略代码
    if f.Length > 0 {
        cc.mu.Lock()
        cc.inflow.add(int32(f.Length))
        cc.mu.Unlock()
    
        cc.wmu.Lock()
        cc.fr.WriteWindowUpdate(0, uint32(f.Length))
        cc.bw.Flush()
        cc.wmu.Unlock()
      }
    return nil
    }
    ログイン後にコピー

    接收到的数据帧在client没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length,并且通过http2FrameWindowUpdate数据帧告知server将当前连接的可写窗口大小增加f.Length

    如果client有对应的数据流且f.Length大于0:

    1、如果是head请求结束当前数据流并返回。

    if cs.req.Method == "HEAD" && len(data) > 0 {
      cc.logf("protocol error: received DATA on a HEAD request")
      rl.endStreamError(cs, http2StreamError{
        StreamID: f.StreamID,
        Code:     http2ErrCodeProtocol,
      })
    return nil
    }
    ログイン後にコピー

    2、检查当前数据流能否处理f.Length长度的数据。

    cc.mu.Lock()
    if cs.inflow.available() >= int32(f.Length) {
      cs.inflow.take(int32(f.Length))
    } else {
      cc.mu.Unlock()
    return http2ConnectionError(http2ErrCodeFlowControl)
    }
    ログイン後にコピー

    由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take减小当前数据流可接受窗口大小。

    3、当前数据流被重置或者被关闭即cs.didReset为true时又或者数据帧有填充数据时需要调整流控制窗口。

    var refund int
    if pad := int(f.Length) - len(data); pad > 0 {
      refund += pad
    }
    // Return len(data) now if the stream is already closed,
    // since data will never be read.
    didReset := cs.didReset
    if didReset {
      refund += len(data)
    }
    if refund > 0 {
      cc.inflow.add(int32(refund))
      cc.wmu.Lock()
      cc.fr.WriteWindowUpdate(0, uint32(refund))
    if !didReset {
        cs.inflow.add(int32(refund))
        cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
      }
      cc.bw.Flush()
      cc.wmu.Unlock()
    }
    cc.mu.Unlock()
    ログイン後にコピー
    • 如果数据帧有填充数据则计算需要返还的填充数据长度。

    • 如果数据流无效该数据帧的长度需要全部返还。

    最后,根据计算的refund增加当前连接或者当前数据流的可接受窗口大小,并且同时告知server增加当前连接或者当前数据流的可写窗口大小。

    4、数据长度大于0且数据流正常则将数据写入数据流缓冲区。

    if len(data) > 0 && !didReset {
    if _, err := cs.bufPipe.Write(data); err != nil {
        rl.endStreamError(cs, err)
    return err
      }
    }
    ログイン後にコピー

    回顾:前面的(*http2clientConnReadLoop).handleResponse方法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时能够通过Response读取到数据流中的缓冲数据。

    (http2transportResponseBody).Read

    在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。

    因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。

    1、读取响应数据后计算当前连接需要增加的可接受窗口大小。

    cc.mu.Lock()
    defer cc.mu.Unlock()
    var connAdd, streamAdd int32
    // Check the conn-level first, before the stream-level.
    if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
      connAdd = http2transportDefaultConnFlow - v
      cc.inflow.add(connAdd)
    }
    ログイン後にコピー

    如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()

    回顾http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame数据帧告知server当前连接可发送窗口大小增加http2transportDefaultConnFlow

    2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。

    if err == nil { // No need to refresh if the stream is over or failed.
    // Consider any buffered body data (read from the conn but not
    // consumed by the client) when computing flow control for this
    // stream.
      v := int(cs.inflow.available()) + cs.bufPipe.Len()
    if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
        streamAdd = int32(http2transportDefaultStreamFlow - v)
        cs.inflow.add(streamAdd)
      }
    }
    ログイン後にコピー

    如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v

    回顾http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。

    连接刚建立时,发送http2FrameSettings数据帧,告知server每个数据流的可发送窗口大小为http2transportDefaultStreamFlow

    newStream时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow

    3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame数据帧告知server。

    if connAdd != 0 || streamAdd != 0 {
      cc.wmu.Lock()
    defer cc.wmu.Unlock()
    if connAdd != 0 {
        cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
      }
    if streamAdd != 0 {
        cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
      }
      cc.bw.Flush()
    }
    ログイン後にコピー

    以上就是server向client发送数据的流控制逻辑。

    (*http2clientStream).writeRequestBody

    前篇中(*http2ClientConn).roundTrip未对(*http2clientStream).writeRequestBody进行分析,下面我们看看该方法的源码:

    func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
    	cc := cs.cc
    	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
    // 此处省略代码
    	req := cs.req
    	hasTrailers := req.Trailer != nil
    	remainLen := http2actualContentLength(req)
    	hasContentLen := remainLen != -1
    
    var sawEOF bool
    for !sawEOF {
    		n, err := body.Read(buf[:len(buf)-1])
    // 此处省略代码
    		remain := buf[:n]
    for len(remain) > 0 && err == nil {
    var allowed int32
    			allowed, err = cs.awaitFlowControl(len(remain))
    switch {
    case err == http2errStopReqBodyWrite:
    return err
    case err == http2errStopReqBodyWriteAndCancel:
    				cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
    return err
    case err != nil:
    return err
    			}
    			cc.wmu.Lock()
    			data := remain[:allowed]
    			remain = remain[allowed:]
    			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
    			err = cc.fr.WriteData(cs.ID, sentEnd, data)
    if err == nil {
    				err = cc.bw.Flush()
    			}
    			cc.wmu.Unlock()
    		}
    if err != nil {
    return err
    		}
    	}
    // 此处省略代码
    return err
    }
    ログイン後にコピー

    上面的逻辑可简单总结为:不停的读取请求body然后将读取的内容通过cc.fr.WriteData转为http2FrameData数据帧发送给server,直到请求body读完为止。其中和流控制有关的方法是awaitFlowControl,下面我们对该方法进行分析。

    (*http2clientStream).awaitFlowControl

    此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。

    func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
    	cc := cs.cc
    	cc.mu.Lock()
    defer cc.mu.Unlock()
    for {
    if cc.closed {
    return 0, http2errClientConnClosed
    		}
    if cs.stopReqBody != nil {
    return 0, cs.stopReqBody
    		}
    if err := cs.checkResetOrDone(); err != nil {
    return 0, err
    		}
    if a := cs.flow.available(); a > 0 {
    			take := a
    if int(take) > maxBytes {
    
    				take = int32(maxBytes) // can&#39;t truncate int; take is int32
    			}
    if take > int32(cc.maxFrameSize) {
    				take = int32(cc.maxFrameSize)
    			}
    			cs.flow.take(take)
    return take, nil
    		}
    		cc.cond.Wait()
    	}
    }
    ログイン後にコピー

    根据源码可以知道,数据流被关闭或者停止发送请求body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:

    1. 当前数据流可写窗口剩余可写数据大于0,则计算可写字节数,并将当前数据流可写窗口大小消耗take

    2. 当前数据流可写窗口剩余可写数据小于等于0,则会一直等待直到被唤醒并进入下一次检查。

    上面的第二种情况在收到http2WindowUpdateFrame数据帧这一节中提到过。

    サーバーは現在のデータ ストリームのデータを読み取った後、http2WindowUpdateFrame データ フレームをクライアントの対応するデータ ストリームに送信します。クライアントがデータ フレームを受信した後、書き込み可能なウィンドウを増やします。 cc.cond.Broadcast()送信データがフロー制御の上限に達したために待機しているデータストリームを起動し、データの送信を続行します。

    上記は、クライアントがサーバーにデータを送信するためのフロー制御ロジックです。

    概要

    1. フレーム ヘッダーの長さは 9 バイトで、ペイロード長、フレーム タイプ、フレーム識別子、データ ストリーム ID の 4 つの部分が含まれます。

    2. フロー制御は 2 つのステップに分けることができます。

    • 最初に、http2FrameSettings を通じて、データ フレームと http2WindowUpdateFrame データ フレームは、現在の接続の読み取り/書き込みウィンドウのサイズと、接続内のデータ ストリームの読み取り/書き込みウィンドウのサイズを相手に通知します。

    • データの読み取りおよび書き込みのプロセス中に、相手側の書き込みウィンドウ サイズは、http2WindowUpdateFrame データ フレームを送信することによって制御されます。

以上がGo が HTTP2.0 リクエスト プロセス分析を開始 (パート 2) - データ フレームとフロー制御の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:Go语言进阶学习
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート