From d4223d6710aad90f892de3ed4f1e7539ccc444a0 Mon Sep 17 00:00:00 2001 From: Tom Bergan Date: Fri, 23 Jun 2017 13:02:09 -0700 Subject: [PATCH] http2: refund connection flow control on DATA frames received after reset If the transport had previously sent a RST_STREAM but had not yet deleted the stream from its list of active streams, we should refund connection-level flow control for any DATA frame received as such DATA frames will never be read. We already refund connection-level flow control if a stream closes with unread data in bufPipe. However, when we receive a DATA frame after reset, we don't bother writing it to bufPipe, so we have to refund the flow control separately. Updates golang/go#20469 Change-Id: I5a9810a5d6b1bd7e291173af53646246545a6665 Reviewed-on: https://go-review.googlesource.com/46591 Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot Reviewed-by: Brad Fitzpatrick --- http2/transport.go | 23 +++++++++++++++------ http2/transport_test.go | 46 ++++++++++++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/http2/transport.go b/http2/transport.go index 24d0af84..850d7ae0 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -1713,16 +1713,27 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error { } // Return any padded flow control now, since we won't // refund it later on body reads. - if pad := int32(f.Length) - int32(len(data)); pad > 0 { - cs.inflow.add(pad) - cc.inflow.add(pad) + var refund int + if pad := int(f.Length) - len(data); pad > 0 { + refund += pad + } + // Return len(data) now if the stream is already closed, + // since data will never be read. + didReset := cs.didReset + if didReset { + refund += len(data) + } + if refund > 0 { + cc.inflow.add(int32(refund)) cc.wmu.Lock() - cc.fr.WriteWindowUpdate(0, uint32(pad)) - cc.fr.WriteWindowUpdate(cs.ID, uint32(pad)) + cc.fr.WriteWindowUpdate(0, uint32(refund)) + if !didReset { + cs.inflow.add(int32(refund)) + cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) + } cc.bw.Flush() cc.wmu.Unlock() } - didReset := cs.didReset cc.mu.Unlock() if len(data) > 0 && !didReset { diff --git a/http2/transport_test.go b/http2/transport_test.go index bf34fc9d..15dfa073 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2210,12 +2210,11 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) { ct.run() } -// See golang.org/issue/16481 -func TestTransportReturnsUnusedFlowControl(t *testing.T) { +func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) { ct := newClientTester(t) - clientClosed := make(chan bool, 1) - serverWroteBody := make(chan bool, 1) + clientClosed := make(chan struct{}) + serverWroteFirstByte := make(chan struct{}) ct.client = func() error { req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) @@ -2223,13 +2222,13 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { if err != nil { return err } - <-serverWroteBody + <-serverWroteFirstByte if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 { return fmt.Errorf("body read = %v, %v; want 1, nil", n, err) } res.Body.Close() // leaving 4999 bytes unread - clientClosed <- true + close(clientClosed) return nil } @@ -2264,10 +2263,27 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { EndStream: false, BlockFragment: buf.Bytes(), }) - ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream - serverWroteBody <- true - <-clientClosed + // Two cases: + // - Send one DATA frame with 5000 bytes. + // - Send two DATA frames with 1 and 4999 bytes each. + // + // In both cases, the client should consume one byte of data, + // refund that byte, then refund the following 4999 bytes. + // + // In the second case, the server waits for the client connection to + // close before seconding the second DATA frame. This tests the case + // where the client receives a DATA frame after it has reset the stream. + if oneDataFrame { + ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 5000)) + close(serverWroteFirstByte) + <-clientClosed + } else { + ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 1)) + close(serverWroteFirstByte) + <-clientClosed + ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 4999)) + } waitingFor := "RSTStreamFrame" for { @@ -2281,7 +2297,7 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { switch waitingFor { case "RSTStreamFrame": if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel { - return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f)) + return fmt.Errorf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f)) } waitingFor = "WindowUpdateFrame" case "WindowUpdateFrame": @@ -2295,6 +2311,16 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ct.run() } +// See golang.org/issue/16481 +func TestTransportReturnsUnusedFlowControlSingleWrite(t *testing.T) { + testTransportReturnsUnusedFlowControl(t, true) +} + +// See golang.org/issue/20469 +func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) { + testTransportReturnsUnusedFlowControl(t, false) +} + // Issue 16612: adjust flow control on open streams when transport // receives SETTINGS with INITIAL_WINDOW_SIZE from server. func TestTransportAdjustsFlowControl(t *testing.T) {