diff --git a/http2/http2.go b/http2/http2.go index 2e27b093..dd6824dd 100644 --- a/http2/http2.go +++ b/http2/http2.go @@ -36,6 +36,7 @@ var ( VerboseLogs bool logFrameWrites bool logFrameReads bool + inTests bool ) func init() { diff --git a/http2/http2_test.go b/http2/http2_test.go index 22c2ace8..52487764 100644 --- a/http2/http2_test.go +++ b/http2/http2_test.go @@ -27,6 +27,7 @@ func condSkipFailingTest(t *testing.T) { } func init() { + inTests = true DebugGoroutines = true flag.BoolVar(&VerboseLogs, "verboseh2", VerboseLogs, "Verbose HTTP/2 debug logging") } diff --git a/http2/server.go b/http2/server.go index 808b0111..11d3b65b 100644 --- a/http2/server.go +++ b/http2/server.go @@ -1858,16 +1858,19 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) { } } +// requestBody is the Handler's Request.Body type. +// Read and Close may be called concurrently. type requestBody struct { stream *stream conn *serverConn - closed bool + closed bool // for use by Close only + sawEOF bool // for use by Read only pipe *pipe // non-nil if we have a HTTP entity message body needsContinue bool // need to send a 100-continue } func (b *requestBody) Close() error { - if b.pipe != nil { + if b.pipe != nil && !b.closed { b.pipe.BreakWithError(errClosedBody) } b.closed = true @@ -1879,12 +1882,15 @@ func (b *requestBody) Read(p []byte) (n int, err error) { b.needsContinue = false b.conn.write100ContinueHeaders(b.stream) } - if b.pipe == nil { + if b.pipe == nil || b.sawEOF { return 0, io.EOF } n, err = b.pipe.Read(p) if err == io.EOF { - b.pipe = nil + b.sawEOF = true + } + if b.conn == nil && inTests { + return } b.conn.noteBodyReadFromHandler(b.stream, n, err) return diff --git a/http2/server_test.go b/http2/server_test.go index 1295b433..22aabf21 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -3372,3 +3372,27 @@ func TestUnreadFlowControlReturned_Server(t *testing.T) { } } + +// grpc-go closes the Request.Body currently with a Read. +// Verify that it doesn't race. +// See https://github.com/grpc/grpc-go/pull/938 +func TestRequestBodyReadCloseRace(t *testing.T) { + for i := 0; i < 100; i++ { + body := &requestBody{ + pipe: &pipe{ + b: new(bytes.Buffer), + }, + } + body.pipe.CloseWithError(io.EOF) + + done := make(chan bool, 1) + buf := make([]byte, 10) + go func() { + time.Sleep(1 * time.Millisecond) + body.Close() + done <- true + }() + body.Read(buf) + <-done + } +}