mirror of
https://github.com/golang/net.git
synced 2026-03-31 10:27:08 +09:00
http2: support Request.Cancel in Transport
Tests are in a separate change, part of the net/http package in the main go repo. Updates golang/go#13159 Change-Id: I236dea7cd076910e908df7e7160d490da56014c8 Reviewed-on: https://go-review.googlesource.com/17757 Reviewed-by: Ian Lance Taylor <iant@golang.org>
This commit is contained in:
@@ -14,10 +14,11 @@ import (
|
||||
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
|
||||
// underlying buffer is an interface. (io.Pipe is always unbuffered)
|
||||
type pipe struct {
|
||||
mu sync.Mutex
|
||||
c sync.Cond // c.L must point to
|
||||
b pipeBuffer
|
||||
err error // read error once empty. non-nil means closed.
|
||||
mu sync.Mutex
|
||||
c sync.Cond // c.L must point to
|
||||
b pipeBuffer
|
||||
err error // read error once empty. non-nil means closed.
|
||||
donec chan struct{} // closed on error
|
||||
}
|
||||
|
||||
type pipeBuffer interface {
|
||||
@@ -78,6 +79,9 @@ func (p *pipe) CloseWithError(err error) {
|
||||
defer p.c.Signal()
|
||||
if p.err == nil {
|
||||
p.err = err
|
||||
if p.donec != nil {
|
||||
close(p.donec)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,3 +92,18 @@ func (p *pipe) Err() error {
|
||||
defer p.mu.Unlock()
|
||||
return p.err
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed if and when this pipe is closed
|
||||
// with CloseWithError.
|
||||
func (p *pipe) Done() <-chan struct{} {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if p.donec == nil {
|
||||
p.donec = make(chan struct{})
|
||||
if p.err != nil {
|
||||
// Already hit an error.
|
||||
close(p.donec)
|
||||
}
|
||||
}
|
||||
return p.donec
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ package http2
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -22,3 +23,30 @@ func TestPipeClose(t *testing.T) {
|
||||
t.Errorf("err = %v want %v", err, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipeDoneChan(t *testing.T) {
|
||||
var p pipe
|
||||
done := p.Done()
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatal("done too soon")
|
||||
default:
|
||||
}
|
||||
p.CloseWithError(io.EOF)
|
||||
select {
|
||||
case <-done:
|
||||
default:
|
||||
t.Fatal("should be done")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipeDoneChan_ErrFirst(t *testing.T) {
|
||||
var p pipe
|
||||
p.CloseWithError(io.EOF)
|
||||
done := p.Done()
|
||||
select {
|
||||
case <-done:
|
||||
default:
|
||||
t.Fatal("should be done")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +161,18 @@ type clientStream struct {
|
||||
resetErr error // populated before peerReset is closed
|
||||
}
|
||||
|
||||
// awaitRequestCancel runs in its own goroutine and waits for the user's
|
||||
func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
|
||||
if cancel == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-cancel:
|
||||
cs.bufPipe.CloseWithError(errRequestCanceled)
|
||||
case <-cs.bufPipe.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// checkReset reports any error sent in a RST_STREAM frame by the
|
||||
// server.
|
||||
func (cs *clientStream) checkReset() error {
|
||||
@@ -465,6 +477,10 @@ func (cc *ClientConn) putFrameScratchBuffer(buf []byte) {
|
||||
// forget about it.
|
||||
}
|
||||
|
||||
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
|
||||
// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
|
||||
var errRequestCanceled = errors.New("net/http: request canceled")
|
||||
|
||||
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cc.mu.Lock()
|
||||
|
||||
@@ -522,6 +538,10 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
|
||||
}
|
||||
}
|
||||
// TODO(bradfitz): this Flush could potentially block (as
|
||||
// could the WriteHeaders call(s) above), which means they
|
||||
// wouldn't respond to Request.Cancel being readable. That's
|
||||
// rare, but this should probably be in a goroutine.
|
||||
cc.bw.Flush()
|
||||
werr := cc.werr
|
||||
cc.wmu.Unlock()
|
||||
@@ -561,6 +581,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
res.Request = req
|
||||
res.TLS = cc.tlsState
|
||||
return res, nil
|
||||
case <-req.Cancel:
|
||||
cs.abortRequestBodyWrite()
|
||||
return nil, errRequestCanceled
|
||||
case err := <-bodyCopyErrc:
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -935,6 +958,7 @@ func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID u
|
||||
cs.bufPipe = pipe{b: buf}
|
||||
cs.bytesRemain = res.ContentLength
|
||||
res.Body = transportResponseBody{cs}
|
||||
go cs.awaitRequestCancel(cs.req.Cancel)
|
||||
|
||||
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
|
||||
res.Header.Del("Content-Encoding")
|
||||
|
||||
Reference in New Issue
Block a user