diff --git a/http2/pipe.go b/http2/pipe.go index 72a1fdc9..96a3eb8d 100644 --- a/http2/pipe.go +++ b/http2/pipe.go @@ -80,3 +80,11 @@ func (p *pipe) CloseWithError(err error) { p.err = err } } + +// Err returns the error (if any) first set with CloseWithError. +// This is the error which will be returned after the reader is exhausted. +func (p *pipe) Err() error { + p.mu.Lock() + defer p.mu.Unlock() + return p.err +} diff --git a/http2/transport.go b/http2/transport.go index 0968bf3c..1978eab2 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -91,11 +92,13 @@ type clientConn struct { } type clientStream struct { - cc *clientConn - ID uint32 - resc chan resAndError - pw *io.PipeWriter - pr *io.PipeReader + cc *clientConn + ID uint32 + resc chan resAndError + bufPipe pipe + + // Owned by readLoop goroutine: + ended bool // on STREAM_ENDED from any of HEADERS/CONTINUATION/DATA flow flow // guarded by cc.mu inflow flow // guarded by cc.mu @@ -685,7 +688,7 @@ func (rl *clientConnReadLoop) cleanup() { err = io.ErrUnexpectedEOF } for _, cs := range rl.activeRes { - cs.pw.CloseWithError(err) + cs.bufPipe.CloseWithError(err) } cc.mu.Lock() @@ -779,7 +782,6 @@ func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame, cs *clientStream) ProtoMajor: 2, Header: make(http.Header), } - cs.pr, cs.pw = io.Pipe() return rl.processHeaderBlockFragment(cs, f.HeaderBlockFragment(), f.HeadersEnded(), f.StreamEnded()) } @@ -809,23 +811,49 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(cs *clientStream, frag return nil } - // TODO: set the Body to one which notes the - // Close and also sends the server a - // RST_STREAM - rl.nextRes.Body = cs.pr res := rl.nextRes + if streamEnded { + res.Body = noBody + cs.ended = true + } else { + buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage + cs.bufPipe = pipe{b: buf} + res.Body = transportResponseBody{cs} + } rl.activeRes[cs.ID] = cs cs.resc <- resAndError{res: res} rl.nextRes = nil // unused now; will be reset next HEADERS frame return nil } +// transportResponseBody is the concrete type of Transport.RoundTrip's +// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. +// On Close it sends RST_STREAM if EOF wasn't already seen. +type transportResponseBody struct { + cs *clientStream +} + +func (b transportResponseBody) Read(p []byte) (n int, err error) { + return b.cs.bufPipe.Read(p) +} + +func (b transportResponseBody) Close() error { + if b.cs.bufPipe.Err() != io.EOF { + // TODO: write test for this + b.cs.cc.writeStreamReset(b.cs.ID, ErrCodeCancel, nil) + } + return nil +} + func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error { if cs == nil { return nil } + if cs.ended { + // TODO: add test for this (DATA frame after STREAM_ENDED cases) + return ConnectionError(ErrCodeProtocol) + } data := f.Data() - // TODO: decrement cs.inflow and cc.inflow, sending errors as appropriate. if VerboseLogs { rl.cc.logf("DATA: %q", data) } @@ -841,13 +869,14 @@ func (rl *clientConnReadLoop) processData(f *DataFrame, cs *clientStream) error } cc.mu.Unlock() - if _, err := cs.pw.Write(data); err != nil { + if _, err := cs.bufPipe.Write(data); err != nil { return err } // send WINDOW_UPDATE frames occasionally as per-stream flow control depletes if f.StreamEnded() { - cs.pw.Close() + cs.ended = true + cs.bufPipe.CloseWithError(io.EOF) delete(rl.activeRes, cs.ID) } return nil @@ -924,7 +953,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame, cs *clientSt err := StreamError{cs.ID, f.ErrCode} cs.resetErr = err close(cs.peerReset) - cs.pw.CloseWithError(err) + cs.bufPipe.CloseWithError(err) } delete(rl.activeRes, cs.ID) return nil @@ -988,3 +1017,5 @@ func (t *Transport) vlogf(format string, args ...interface{}) { func (t *Transport) logf(format string, args ...interface{}) { log.Printf(format, args...) } + +var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))