diff --git a/http2/buffer.go b/http2/fixed_buffer.go similarity index 50% rename from http2/buffer.go rename to http2/fixed_buffer.go index 907e2984..47da0f0b 100644 --- a/http2/buffer.go +++ b/http2/fixed_buffer.go @@ -8,46 +8,41 @@ import ( "errors" ) -// buffer is an io.ReadWriteCloser backed by a fixed size buffer. +// fixedBuffer is an io.ReadWriter backed by a fixed size buffer. // It never allocates, but moves old data as new data is written. -type buffer struct { - buf []byte - r, w int - closed bool - err error // err to return to reader +type fixedBuffer struct { + buf []byte + r, w int } var ( - errReadEmpty = errors.New("read from empty buffer") - errWriteClosed = errors.New("write on closed buffer") - errWriteFull = errors.New("write on full buffer") + errReadEmpty = errors.New("read from empty fixedBuffer") + errWriteFull = errors.New("write on full fixedBuffer") ) // Read copies bytes from the buffer into p. // It is an error to read when no data is available. -func (b *buffer) Read(p []byte) (n int, err error) { +func (b *fixedBuffer) Read(p []byte) (n int, err error) { + if b.r == b.w { + return 0, errReadEmpty + } n = copy(p, b.buf[b.r:b.w]) b.r += n - if b.closed && b.r == b.w { - err = b.err - } else if b.r == b.w && n == 0 { - err = errReadEmpty + if b.r == b.w { + b.r = 0 + b.w = 0 } - return n, err + return n, nil } // Len returns the number of bytes of the unread portion of the buffer. -func (b *buffer) Len() int { +func (b *fixedBuffer) Len() int { return b.w - b.r } // Write copies bytes from p into the buffer. // It is an error to write more data than the buffer can hold. -func (b *buffer) Write(p []byte) (n int, err error) { - if b.closed { - return 0, errWriteClosed - } - +func (b *fixedBuffer) Write(p []byte) (n int, err error) { // Slide existing data to beginning. if b.r > 0 && len(p) > len(b.buf)-b.w { copy(b.buf, b.buf[b.r:b.w]) @@ -63,13 +58,3 @@ func (b *buffer) Write(p []byte) (n int, err error) { } return n, err } - -// Close marks the buffer as closed. Future calls to Write will -// return an error. Future calls to Read, once the buffer is -// empty, will return err. -func (b *buffer) Close(err error) { - if !b.closed { - b.closed = true - b.err = err - } -} diff --git a/http2/buffer_test.go b/http2/fixed_buffer_test.go similarity index 64% rename from http2/buffer_test.go rename to http2/fixed_buffer_test.go index 9d7d98b5..f5432f8d 100644 --- a/http2/buffer_test.go +++ b/http2/fixed_buffer_test.go @@ -5,47 +5,36 @@ package http2 import ( - "io" "reflect" "testing" ) var bufferReadTests = []struct { - buf buffer + buf fixedBuffer read, wn int werr error wp []byte - wbuf buffer + wbuf fixedBuffer }{ { - buffer{[]byte{'a', 0}, 0, 1, false, nil}, + fixedBuffer{[]byte{'a', 0}, 0, 1}, 5, 1, nil, []byte{'a'}, - buffer{[]byte{'a', 0}, 1, 1, false, nil}, + fixedBuffer{[]byte{'a', 0}, 0, 0}, }, { - buffer{[]byte{'a', 0}, 0, 1, true, io.EOF}, - 5, 1, io.EOF, []byte{'a'}, - buffer{[]byte{'a', 0}, 1, 1, true, io.EOF}, - }, - { - buffer{[]byte{0, 'a'}, 1, 2, false, nil}, + fixedBuffer{[]byte{0, 'a'}, 1, 2}, 5, 1, nil, []byte{'a'}, - buffer{[]byte{0, 'a'}, 2, 2, false, nil}, + fixedBuffer{[]byte{0, 'a'}, 0, 0}, }, { - buffer{[]byte{0, 'a'}, 1, 2, true, io.EOF}, - 5, 1, io.EOF, []byte{'a'}, - buffer{[]byte{0, 'a'}, 2, 2, true, io.EOF}, + fixedBuffer{[]byte{'a', 'b'}, 0, 2}, + 1, 1, nil, []byte{'a'}, + fixedBuffer{[]byte{'a', 'b'}, 1, 2}, }, { - buffer{[]byte{}, 0, 0, false, nil}, + fixedBuffer{[]byte{}, 0, 0}, 5, 0, errReadEmpty, []byte{}, - buffer{[]byte{}, 0, 0, false, nil}, - }, - { - buffer{[]byte{}, 0, 0, true, io.EOF}, - 5, 0, io.EOF, []byte{}, - buffer{[]byte{}, 0, 0, true, io.EOF}, + fixedBuffer{[]byte{}, 0, 0}, }, } @@ -72,64 +61,50 @@ func TestBufferRead(t *testing.T) { } var bufferWriteTests = []struct { - buf buffer + buf fixedBuffer write, wn int werr error - wbuf buffer + wbuf fixedBuffer }{ { - buf: buffer{ + buf: fixedBuffer{ buf: []byte{}, }, - wbuf: buffer{ + wbuf: fixedBuffer{ buf: []byte{}, }, }, { - buf: buffer{ + buf: fixedBuffer{ buf: []byte{1, 'a'}, }, write: 1, wn: 1, - wbuf: buffer{ + wbuf: fixedBuffer{ buf: []byte{0, 'a'}, w: 1, }, }, { - buf: buffer{ + buf: fixedBuffer{ buf: []byte{'a', 1}, r: 1, w: 1, }, write: 2, wn: 2, - wbuf: buffer{ + wbuf: fixedBuffer{ buf: []byte{0, 0}, w: 2, }, }, { - buf: buffer{ - buf: []byte{}, - r: 1, - closed: true, - }, - write: 5, - werr: errWriteClosed, - wbuf: buffer{ - buf: []byte{}, - r: 1, - closed: true, - }, - }, - { - buf: buffer{ + buf: fixedBuffer{ buf: []byte{}, }, write: 5, werr: errWriteFull, - wbuf: buffer{ + wbuf: fixedBuffer{ buf: []byte{}, }, }, diff --git a/http2/pipe.go b/http2/pipe.go index 51699dcc..72a1fdc9 100644 --- a/http2/pipe.go +++ b/http2/pipe.go @@ -5,38 +5,78 @@ package http2 import ( + "errors" + "io" "sync" ) +// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like +// io.Pipe except there are no PipeReader/PipeWriter halves, and the +// underlying buffer is an interface. (io.Pipe is always unbuffered) type pipe struct { - b buffer - c sync.Cond - m sync.Mutex + mu sync.Mutex + c sync.Cond // c.L must point to + b pipeBuffer + err error // read error once empty. non-nil means closed. +} + +type pipeBuffer interface { + Len() int + io.Writer + io.Reader } // Read waits until data is available and copies bytes // from the buffer into p. -func (r *pipe) Read(p []byte) (n int, err error) { - r.c.L.Lock() - defer r.c.L.Unlock() - for r.b.Len() == 0 && !r.b.closed { - r.c.Wait() +func (p *pipe) Read(d []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.c.L == nil { + p.c.L = &p.mu + } + for { + if p.b.Len() > 0 { + return p.b.Read(d) + } + if p.err != nil { + return 0, p.err + } + p.c.Wait() } - return r.b.Read(p) } +var errClosedPipeWrite = errors.New("write on closed buffer") + // Write copies bytes from p into the buffer and wakes a reader. // It is an error to write more data than the buffer can hold. -func (w *pipe) Write(p []byte) (n int, err error) { - w.c.L.Lock() - defer w.c.L.Unlock() - defer w.c.Signal() - return w.b.Write(p) +func (p *pipe) Write(d []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.c.L == nil { + p.c.L = &p.mu + } + defer p.c.Signal() + if p.err != nil { + return 0, errClosedPipeWrite + } + return p.b.Write(d) } -func (c *pipe) Close(err error) { - c.c.L.Lock() - defer c.c.L.Unlock() - defer c.c.Signal() - c.b.Close(err) +// CloseWithError causes Reads to wake up and return the +// provided err after all data has been read. +// +// The error must be non-nil. +func (p *pipe) CloseWithError(err error) { + if err == nil { + panic("CloseWithError must be non-nil") + } + p.mu.Lock() + defer p.mu.Unlock() + if p.c.L == nil { + p.c.L = &p.mu + } + defer p.c.Signal() + if p.err == nil { + p.err = err + } } diff --git a/http2/pipe_test.go b/http2/pipe_test.go index 5283b66c..002ce05e 100644 --- a/http2/pipe_test.go +++ b/http2/pipe_test.go @@ -5,17 +5,18 @@ package http2 import ( + "bytes" "errors" "testing" ) func TestPipeClose(t *testing.T) { var p pipe - p.c.L = &p.m + p.b = new(bytes.Buffer) a := errors.New("a") b := errors.New("b") - p.Close(a) - p.Close(b) + p.CloseWithError(a) + p.CloseWithError(b) _, err := p.Read(make([]byte, 1)) if err != a { t.Errorf("err = %v want %v", err, a) diff --git a/http2/server.go b/http2/server.go index ba408fe7..5fb92cde 100644 --- a/http2/server.go +++ b/http2/server.go @@ -65,6 +65,7 @@ const ( var ( errClientDisconnected = errors.New("client disconnected") errClosedBody = errors.New("body closed by handler") + errHandlerComplete = errors.New("http2: request body closed due to handler exiting") errStreamClosed = errors.New("http2: stream closed") ) @@ -872,7 +873,7 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) { errCancel := StreamError{st.id, ErrCodeCancel} sc.resetStream(errCancel) case stateHalfClosedRemote: - sc.closeStream(st, nil) + sc.closeStream(st, errHandlerComplete) } } @@ -1142,7 +1143,7 @@ func (sc *serverConn) closeStream(st *stream, err error) { } delete(sc.streams, st.id) if p := st.body; p != nil { - p.Close(err) + p.CloseWithError(err) } st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc sc.writeSched.forgetStream(st.id) @@ -1246,7 +1247,7 @@ func (sc *serverConn) processData(f *DataFrame) error { // Sender sending more than they'd declared? if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { - st.body.Close(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) + st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) return StreamError{id, ErrCodeStreamClosed} } if len(data) > 0 { @@ -1266,10 +1267,10 @@ func (sc *serverConn) processData(f *DataFrame) error { } if f.StreamEnded() { if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { - st.body.Close(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", + st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", st.declBodyBytes, st.bodyBytes)) } else { - st.body.Close(io.EOF) + st.body.CloseWithError(io.EOF) } st.state = stateHalfClosedRemote } @@ -1493,9 +1494,8 @@ func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, err } if bodyOpen { body.pipe = &pipe{ - b: buffer{buf: make([]byte, initialWindowSize)}, // TODO: share/remove XXX + b: &fixedBuffer{buf: make([]byte, initialWindowSize)}, // TODO: share/remove XXX } - body.pipe.c.L = &body.pipe.m if vv, ok := rp.header["Content-Length"]; ok { req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) @@ -1655,7 +1655,7 @@ type requestBody struct { func (b *requestBody) Close() error { if b.pipe != nil { - b.pipe.Close(errClosedBody) + b.pipe.CloseWithError(errClosedBody) } b.closed = true return nil