http2: surface errors occurring very early in a client conn's lifetime

When we create a new connection for a request, the request should
fail if the connection attempt fails.

There is a race condition which can cause this to not happen:

- net/http sends a request to a http2.Transport
- the http2.Transport returns ErrNoCachedConn
- net/http creates a new tls.Conn and passes it to the http2.Transport
- the http2.Transport adds the conn to its connection pool
- the connection immediately encounters an error
- the http2.Transport removes the conn from its connection pool
- net/http resends the request to the http2.Transport
- the http2.Transport returns ErrNoCachedConn, and the process repeates

If the request is sent to the http2.Transport before the connection
encounters an error, then the request fails. But otherwise, we get
stuck in an infinite loop of the http2.Transport asking for a new
connection, receiving one, and throwing it away.

To fix this, leave a dead connection in the pool for a short while
if it has never had a request sent to it. If a dead connection is
used for a new request, return an error and remove the connection
from the pool.

Change-Id: I64eb15a8f1512a6bda52db423072b945fab6f4b5
Reviewed-on: https://go-review.googlesource.com/c/net/+/625398
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
This commit is contained in:
Damien Neil
2024-11-05 11:26:18 -08:00
parent 0aa844c2c8
commit 858db1a8c8
4 changed files with 219 additions and 20 deletions

View File

@@ -10,6 +10,7 @@ package http2
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
@@ -112,27 +113,40 @@ func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientCo
cc: cc,
group: cc.t.transportTestHooks.group.(*synctestGroup),
}
cli, srv := synctestNetPipe(tc.group)
// srv is the side controlled by the test.
var srv *synctestNetConn
if cc.tconn == nil {
// If cc.tconn is nil, we're being called with a new conn created by the
// Transport's client pool. This path skips dialing the server, and we
// create a test connection pair here.
cc.tconn, srv = synctestNetPipe(tc.group)
} else {
// If cc.tconn is non-nil, we're in a test which provides a conn to the
// Transport via a TLSNextProto hook. Extract the test connection pair.
if tc, ok := cc.tconn.(*tls.Conn); ok {
// Unwrap any *tls.Conn to the underlying net.Conn,
// to avoid dealing with encryption in tests.
cc.tconn = tc.NetConn()
}
srv = cc.tconn.(*synctestNetConn).peer
}
srv.SetReadDeadline(tc.group.Now())
srv.autoWait = true
tc.netconn = srv
tc.enc = hpack.NewEncoder(&tc.encbuf)
// all writes and reads are finished.
//
// cli is the ClientConn's side, srv is the side controlled by the test.
cc.tconn = cli
tc.fr = NewFramer(srv, srv)
tc.testConnFramer = testConnFramer{
t: t,
fr: tc.fr,
dec: hpack.NewDecoder(initialHeaderTableSize, nil),
}
tc.fr.SetMaxReadFrameSize(10 << 20)
t.Cleanup(func() {
tc.closeWrite()
})
return tc
}
@@ -503,6 +517,8 @@ func newTestTransport(t *testing.T, opts ...any) *testTransport {
o(tr.t1)
case func(*Transport):
o(tr)
case *Transport:
tr = o
}
}
tt.tr = tr

View File

@@ -28,8 +28,11 @@ func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
s1 := newSynctestNetConnHalf(s1addr)
s2 := newSynctestNetConnHalf(s2addr)
return &synctestNetConn{group: group, loc: s1, rem: s2},
&synctestNetConn{group: group, loc: s2, rem: s1}
r = &synctestNetConn{group: group, loc: s1, rem: s2}
w = &synctestNetConn{group: group, loc: s2, rem: s1}
r.peer = w
w.peer = r
return r, w
}
// A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
@@ -43,6 +46,9 @@ type synctestNetConn struct {
// When set, group.Wait is automatically called before reads and after writes.
autoWait bool
// peer is the other endpoint.
peer *synctestNetConn
}
// Read reads data from the connection.

View File

@@ -202,6 +202,20 @@ func (t *Transport) markNewGoroutine() {
}
}
func (t *Transport) now() time.Time {
if t != nil && t.transportTestHooks != nil {
return t.transportTestHooks.group.Now()
}
return time.Now()
}
func (t *Transport) timeSince(when time.Time) time.Duration {
if t != nil && t.transportTestHooks != nil {
return t.now().Sub(when)
}
return time.Since(when)
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
@@ -343,7 +357,7 @@ type ClientConn struct {
t *Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
reused uint32 // whether conn is being reused; atomic
atomicReused uint32 // whether conn is being reused; atomic
singleUse bool // whether being used for a single http.Request
getConnCalled bool // used by clientConnPool
@@ -609,7 +623,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
traceGotConn(req, cc, reused)
res, err := cc.RoundTrip(req)
if err != nil && retry <= 6 {
@@ -634,6 +648,22 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
}
}
}
if err == errClientConnNotEstablished {
// This ClientConn was created recently,
// this is the first request to use it,
// and the connection is closed and not usable.
//
// In this state, cc.idleTimer will remove the conn from the pool
// when it fires. Stop the timer and remove it here so future requests
// won't try to use this connection.
//
// If the timer has already fired and we're racing it, the redundant
// call to MarkDead is harmless.
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
t.connPool().MarkDead(cc)
}
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
return nil, err
@@ -652,9 +682,10 @@ func (t *Transport) CloseIdleConnections() {
}
var (
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnNotEstablished = errors.New("http2: client conn could not be established")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
@@ -793,6 +824,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
lastActive: t.now(),
}
var group synctestGroupInterface
if t.transportTestHooks != nil {
@@ -1041,6 +1073,16 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
!cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
// If this connection has never been used for a request and is closed,
// then let it take a request (which will fail).
//
// This avoids a situation where an error early in a connection's lifetime
// goes unreported.
if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed {
st.canTakeNewRequest = true
}
return
}
@@ -1062,7 +1104,7 @@ func (cc *ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -1706,7 +1748,12 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
// Must hold cc.mu.
func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
for {
cc.lastActive = time.Now()
if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
// This is the very first request sent to this connection.
// Return a fatal error which aborts the retry loop.
return errClientConnNotEstablished
}
cc.lastActive = cc.t.now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
@@ -2253,10 +2300,10 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
cc.lastActive = time.Now()
cc.lastActive = cc.t.now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = time.Now()
cc.lastIdle = cc.t.now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
@@ -2316,7 +2363,6 @@ func isEOFOrNetReadError(err error) bool {
func (rl *clientConnReadLoop) cleanup() {
cc := rl.cc
cc.t.connPool().MarkDead(cc)
defer cc.closeConn()
defer close(cc.readerDone)
@@ -2340,6 +2386,24 @@ func (rl *clientConnReadLoop) cleanup() {
}
cc.closed = true
// If the connection has never been used, and has been open for only a short time,
// leave it in the connection pool for a little while.
//
// This avoids a situation where new connections are constantly created,
// added to the pool, fail, and are removed from the pool, without any error
// being surfaced to the user.
const unusedWaitTime = 5 * time.Second
idleTime := cc.t.now().Sub(cc.lastActive)
if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime {
cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
cc.t.connPool().MarkDead(cc)
})
} else {
cc.mu.Unlock() // avoid any deadlocks in MarkDead
cc.t.connPool().MarkDead(cc)
cc.mu.Lock()
}
for _, cs := range cc.streams {
select {
case <-cs.peerClosed:
@@ -3332,7 +3396,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = time.Since(cc.lastActive)
ci.IdleTime = cc.t.timeSince(cc.lastActive)
}
cc.mu.Unlock()

View File

@@ -5638,3 +5638,116 @@ func TestTransportConnBecomesUnresponsive(t *testing.T) {
rt2.wantStatus(200)
rt2.response().Body.Close()
}
// Test that the Transport can use a conn provided to it by a TLSNextProto hook.
func TestTransportTLSNextProtoConnOK(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()
tc.greet()
// Send a request on the Transport.
// It uses the conn we provided.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
tc.wantHeaders(wantHeader{
streamID: 1,
endStream: true,
header: http.Header{
":authority": []string{"dummy.tld"},
":method": []string{"GET"},
":path": []string{"/"},
},
})
tc.writeHeaders(HeadersFrameParam{
StreamID: 1,
EndHeaders: true,
EndStream: true,
BlockFragment: tc.makeHeaderBlockFragment(
":status", "200",
),
})
rt.wantStatus(200)
rt.wantBody(nil)
}
// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error.
func TestTransportTLSNextProtoConnImmediateFailureUsed(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()
// The connection encounters an error before we send a request that uses it.
tc.closeWrite()
// Send a request on the Transport.
//
// It should fail, because we have no usable connections, but not with ErrNoCachedConn.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
if err := rt.err(); err == nil || errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip with broken conn: got %v, want an error other than ErrNoCachedConn", err)
}
// Send the request again.
// This time it should fail with ErrNoCachedConn,
// because the dead conn has been removed from the pool.
rt = tt.roundTrip(req)
if err := rt.err(); !errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip after broken conn is used: got %v, want ErrNoCachedConn", err)
}
}
// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error,
// but no requests are sent which would use the bad connection.
func TestTransportTLSNextProtoConnImmediateFailureUnused(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()
// The connection encounters an error before we send a request that uses it.
tc.closeWrite()
// Some time passes.
// The dead connection is removed from the pool.
tc.advance(10 * time.Second)
// Send a request on the Transport.
//
// It should fail with ErrNoCachedConn, because the pool contains no conns.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
if err := rt.err(); !errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip after broken conn expires: got %v, want ErrNoCachedConn", err)
}
}