diff --git a/http2/writesched_random.go b/http2/writesched_random.go index 36d7919f..9a7b9e58 100644 --- a/http2/writesched_random.go +++ b/http2/writesched_random.go @@ -19,7 +19,8 @@ type randomWriteScheduler struct { zero writeQueue // sq contains the stream-specific queues, keyed by stream ID. - // When a stream is idle or closed, it's deleted from the map. + // When a stream is idle, closed, or emptied, it's deleted + // from the map. sq map[uint32]*writeQueue // pool of empty queues for reuse. @@ -63,8 +64,12 @@ func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) { return ws.zero.shift(), true } // Iterate over all non-idle streams until finding one that can be consumed. - for _, q := range ws.sq { + for streamID, q := range ws.sq { if wr, ok := q.consume(math.MaxInt32); ok { + if q.empty() { + delete(ws.sq, streamID) + ws.queuePool.put(q) + } return wr, true } } diff --git a/http2/writesched_random_test.go b/http2/writesched_random_test.go index 3bf4aa36..1f501b4b 100644 --- a/http2/writesched_random_test.go +++ b/http2/writesched_random_test.go @@ -41,4 +41,20 @@ func TestRandomScheduler(t *testing.T) { t.Errorf("frame not found for stream %d", id) } } + + // Verify that we clean up maps for empty queues in all cases (golang.org/issue/33812) + const arbitraryStreamID = 123 + ws.Push(makeHandlerPanicRST(arbitraryStreamID)) + rws := ws.(*randomWriteScheduler) + if got, want := len(rws.sq), 1; got != want { + t.Fatalf("len of 123 stream = %v; want %v", got, want) + } + _, ok := ws.Pop() + if !ok { + t.Fatal("expected to be able to Pop") + } + if got, want := len(rws.sq), 0; got != want { + t.Fatalf("len of 123 stream = %v; want %v", got, want) + } + } diff --git a/http2/writesched_test.go b/http2/writesched_test.go index 0807056b..99be5a77 100644 --- a/http2/writesched_test.go +++ b/http2/writesched_test.go @@ -20,6 +20,11 @@ func makeWriteHeadersRequest(streamID uint32) FrameWriteRequest { return FrameWriteRequest{&writeResHeaders{streamID: streamID, httpResCode: 200}, st, nil} } +func makeHandlerPanicRST(streamID uint32) FrameWriteRequest { + st := &stream{id: streamID} + return FrameWriteRequest{&handlerPanicRST{StreamID: streamID}, st, nil} +} + func checkConsume(wr FrameWriteRequest, nbytes int32, want []FrameWriteRequest) error { consumed, rest, n := wr.Consume(nbytes) var wantConsumed, wantRest FrameWriteRequest