diff --git a/http2/server.go b/http2/server.go index d2e52f3d..4042d545 100644 --- a/http2/server.go +++ b/http2/server.go @@ -869,9 +869,7 @@ func (sc *serverConn) serve() { // Each connection starts with initialWindowSize inflow tokens. // If a higher value is configured, we add more tokens. - if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { - sc.sendWindowUpdate(nil, int(diff)) - } + sc.sendWindowUpdate(nil) if err := sc.readPreface(); err != nil { sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) @@ -1588,7 +1586,7 @@ func (sc *serverConn) closeStream(st *stream, err error) { if p := st.body; p != nil { // Return any buffered unread bytes worth of conn-level flow control. // See golang.org/issue/16481 - sc.sendWindowUpdate(nil, p.Len()) + sc.sendWindowUpdate(nil) p.CloseWithError(err) } @@ -1736,7 +1734,7 @@ func (sc *serverConn) processData(f *DataFrame) error { // sendWindowUpdate, which also schedules sending the // frames. sc.inflow.take(int32(f.Length)) - sc.sendWindowUpdate(nil, int(f.Length)) // conn-level + sc.sendWindowUpdate(nil) // conn-level if st != nil && st.resetQueued { // Already have a stream error in flight. Don't send another. @@ -1754,7 +1752,7 @@ func (sc *serverConn) processData(f *DataFrame) error { return sc.countError("data_flow", streamError(id, ErrCodeFlowControl)) } sc.inflow.take(int32(f.Length)) - sc.sendWindowUpdate(nil, int(f.Length)) // conn-level + sc.sendWindowUpdate(nil) // conn-level st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the @@ -1772,7 +1770,7 @@ func (sc *serverConn) processData(f *DataFrame) error { if len(data) > 0 { wrote, err := st.body.Write(data) if err != nil { - sc.sendWindowUpdate(nil, int(f.Length)-wrote) + sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote)) return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed)) } if wrote != len(data) { @@ -2324,17 +2322,32 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { func (sc *serverConn) noteBodyRead(st *stream, n int) { sc.serveG.check() - sc.sendWindowUpdate(nil, n) // conn-level + sc.sendWindowUpdate(nil) // conn-level if st.state != stateHalfClosedRemote && st.state != stateClosed { // Don't send this WINDOW_UPDATE if the stream is closed // remotely. - sc.sendWindowUpdate(st, n) + sc.sendWindowUpdate(st) } } // st may be nil for conn-level -func (sc *serverConn) sendWindowUpdate(st *stream, n int) { +func (sc *serverConn) sendWindowUpdate(st *stream) { sc.serveG.check() + + var n int32 + if st == nil { + if avail, windowSize := sc.inflow.available(), sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 { + return + } else { + n = windowSize - avail + } + } else { + if avail, windowSize := st.inflow.available(), sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 { + return + } else { + n = windowSize - avail + } + } // "The legal range for the increment to the flow control // window is 1 to 2^31-1 (2,147,483,647) octets." // A Go Read call on 64-bit machines could in theory read diff --git a/http2/server_test.go b/http2/server_test.go index b77372ce..ba8cea3f 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -809,9 +809,6 @@ func TestServer_Request_Post_Body_ContentLength_TooSmall(t *testing.T) { EndHeaders: true, }) st.writeData(1, true, []byte("12345")) - // Return flow control bytes back, since the data handler closed - // the stream. - st.wantWindowUpdate(0, 5) }) } @@ -1247,6 +1244,41 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) { st.greet() + st.writeHeaders(HeadersFrameParam{ + StreamID: 1, // clients send odd numbers + BlockFragment: st.encodeHeader(":method", "POST"), + EndStream: false, // data coming + EndHeaders: true, + }) + updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate + st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10)) + st.writeData(1, false, bytes.Repeat([]byte("b"), 10)) + puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10))) + puppet.do(readBodyHandler(t, strings.Repeat("b", 10))) + + st.wantWindowUpdate(0, uint32(updateSize)) + st.wantWindowUpdate(1, uint32(updateSize)) + + st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10)) + st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here + puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10))) + puppet.do(readBodyHandler(t, strings.Repeat("c", 15))) + + st.wantWindowUpdate(0, uint32(updateSize+5)) +} + +func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) { + puppet := newHandlerPuppet() + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) { + puppet.act(w, r) + }, func(s *Server) { + s.MaxUploadBufferPerStream = 6 + }) + defer st.Close() + defer puppet.done() + + st.greet() + st.writeHeaders(HeadersFrameParam{ StreamID: 1, // clients send odd numbers BlockFragment: st.encodeHeader(":method", "POST"), @@ -1255,18 +1287,14 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) { }) st.writeData(1, false, []byte("abcdef")) puppet.do(readBodyHandler(t, "abc")) - st.wantWindowUpdate(0, 3) - st.wantWindowUpdate(1, 3) + puppet.do(readBodyHandler(t, "d")) + puppet.do(readBodyHandler(t, "ef")) - puppet.do(readBodyHandler(t, "def")) - st.wantWindowUpdate(0, 3) - st.wantWindowUpdate(1, 3) + st.wantWindowUpdate(1, 6) st.writeData(1, true, []byte("ghijkl")) // END_STREAM here puppet.do(readBodyHandler(t, "ghi")) puppet.do(readBodyHandler(t, "jkl")) - st.wantWindowUpdate(0, 3) - st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM } // the version of the TestServer_Handler_Sends_WindowUpdate with padding. @@ -1295,12 +1323,7 @@ func TestServer_Handler_Sends_WindowUpdate_Padding(t *testing.T) { st.wantWindowUpdate(1, 5) puppet.do(readBodyHandler(t, "abc")) - st.wantWindowUpdate(0, 3) - st.wantWindowUpdate(1, 3) - puppet.do(readBodyHandler(t, "def")) - st.wantWindowUpdate(0, 3) - st.wantWindowUpdate(1, 3) } func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) { @@ -2296,8 +2319,6 @@ func TestServer_Response_Automatic100Continue(t *testing.T) { // gigantic and/or sensitive "foo" payload now. st.writeData(1, true, []byte(msg)) - st.wantWindowUpdate(0, uint32(len(msg))) - hf = st.wantHeaders() if hf.StreamEnded() { t.Fatal("expected data to follow") @@ -2485,9 +2506,6 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) { // it did before. st.writeData(1, true, []byte("foo")) - // Get our flow control bytes back, since the handler didn't get them. - st.wantWindowUpdate(0, uint32(len("foo"))) - // Sent after a peer sends data anyway (admittedly the // previous RST_STREAM might've still been in-flight), // but they'll get the more friendly 'cancel' code @@ -3906,7 +3924,6 @@ func TestServer_Rejects_TooSmall(t *testing.T) { EndHeaders: true, }) st.writeData(1, true, []byte("12345")) - st.wantWindowUpdate(0, 5) st.wantRSTStream(1, ErrCodeProtocol) }) } @@ -4199,7 +4216,6 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) { st.writeData(1, false, []byte(content[5:])) blockCh <- true - increments := len(content) for { f, err := st.readFrame() if err == io.EOF { @@ -4208,10 +4224,12 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) { if err != nil { t.Fatal(err) } + if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 { + break + } if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 { - increments -= int(wu.Increment) - if increments == 0 { - break + if e, a := uint32(3), wu.Increment; e != a { + t.Errorf("Increment=%d, want %d", a, e) } } }