From 08a7b454b02630022dc24413246370e2aeb5a799 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 14 Dec 2015 20:26:59 +0000 Subject: [PATCH] http2: support Request.Cancel in Transport Tests are in a separate change, part of the net/http package in the main go repo. Updates golang/go#13159 Change-Id: I236dea7cd076910e908df7e7160d490da56014c8 Reviewed-on: https://go-review.googlesource.com/17757 Reviewed-by: Ian Lance Taylor --- http2/pipe.go | 27 +++++++++++++++++++++++---- http2/pipe_test.go | 28 ++++++++++++++++++++++++++++ http2/transport.go | 24 ++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/http2/pipe.go b/http2/pipe.go index 96a3eb8d..e30661cb 100644 --- a/http2/pipe.go +++ b/http2/pipe.go @@ -14,10 +14,11 @@ import ( // io.Pipe except there are no PipeReader/PipeWriter halves, and the // underlying buffer is an interface. (io.Pipe is always unbuffered) type pipe struct { - mu sync.Mutex - c sync.Cond // c.L must point to - b pipeBuffer - err error // read error once empty. non-nil means closed. + mu sync.Mutex + c sync.Cond // c.L must point to + b pipeBuffer + err error // read error once empty. non-nil means closed. + donec chan struct{} // closed on error } type pipeBuffer interface { @@ -78,6 +79,9 @@ func (p *pipe) CloseWithError(err error) { defer p.c.Signal() if p.err == nil { p.err = err + if p.donec != nil { + close(p.donec) + } } } @@ -88,3 +92,18 @@ func (p *pipe) Err() error { defer p.mu.Unlock() return p.err } + +// Done returns a channel which is closed if and when this pipe is closed +// with CloseWithError. +func (p *pipe) Done() <-chan struct{} { + p.mu.Lock() + defer p.mu.Unlock() + if p.donec == nil { + p.donec = make(chan struct{}) + if p.err != nil { + // Already hit an error. + close(p.donec) + } + } + return p.donec +} diff --git a/http2/pipe_test.go b/http2/pipe_test.go index 002ce05e..b35b2df8 100644 --- a/http2/pipe_test.go +++ b/http2/pipe_test.go @@ -7,6 +7,7 @@ package http2 import ( "bytes" "errors" + "io" "testing" ) @@ -22,3 +23,30 @@ func TestPipeClose(t *testing.T) { t.Errorf("err = %v want %v", err, a) } } + +func TestPipeDoneChan(t *testing.T) { + var p pipe + done := p.Done() + select { + case <-done: + t.Fatal("done too soon") + default: + } + p.CloseWithError(io.EOF) + select { + case <-done: + default: + t.Fatal("should be done") + } +} + +func TestPipeDoneChan_ErrFirst(t *testing.T) { + var p pipe + p.CloseWithError(io.EOF) + done := p.Done() + select { + case <-done: + default: + t.Fatal("should be done") + } +} diff --git a/http2/transport.go b/http2/transport.go index 3327a6df..d9baa666 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -161,6 +161,18 @@ type clientStream struct { resetErr error // populated before peerReset is closed } +// awaitRequestCancel runs in its own goroutine and waits for the user's +func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) { + if cancel == nil { + return + } + select { + case <-cancel: + cs.bufPipe.CloseWithError(errRequestCanceled) + case <-cs.bufPipe.Done(): + } +} + // checkReset reports any error sent in a RST_STREAM frame by the // server. func (cs *clientStream) checkReset() error { @@ -465,6 +477,10 @@ func (cc *ClientConn) putFrameScratchBuffer(buf []byte) { // forget about it. } +// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not +// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. +var errRequestCanceled = errors.New("net/http: request canceled") + func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { cc.mu.Lock() @@ -522,6 +538,10 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { cc.fr.WriteContinuation(cs.ID, endHeaders, chunk) } } + // TODO(bradfitz): this Flush could potentially block (as + // could the WriteHeaders call(s) above), which means they + // wouldn't respond to Request.Cancel being readable. That's + // rare, but this should probably be in a goroutine. cc.bw.Flush() werr := cc.werr cc.wmu.Unlock() @@ -561,6 +581,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { res.Request = req res.TLS = cc.tlsState return res, nil + case <-req.Cancel: + cs.abortRequestBodyWrite() + return nil, errRequestCanceled case err := <-bodyCopyErrc: if err != nil { return nil, err @@ -935,6 +958,7 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID u cs.bufPipe = pipe{b: buf} cs.bytesRemain = res.ContentLength res.Body = transportResponseBody{cs} + go cs.awaitRequestCancel(cs.req.Cancel) if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding")