mirror of
https://github.com/golang/net.git
synced 2026-03-31 18:37:08 +09:00
In dialCall.dial, close the done channel to mark dial completion after adding the new connection to the clientConnPool. Fixes a race condition in TestTransportBodyReadError, where the client side of the test could observe the clientConnPool as unexpectedly containing no conns, because the new conn had not yet been added to the pool. Fixes golang/go#44304. Change-Id: I121200f9aa664fae29d0532e7fa2da47de3fe6a8 Reviewed-on: https://go-review.googlesource.com/c/net/+/410934 TryBot-Result: Gopher Robot <gobot@golang.org> Run-TryBot: Damien Neil <dneil@google.com> Reviewed-by: Ian Lance Taylor <iant@google.com>
312 lines
8.4 KiB
Go
312 lines
8.4 KiB
Go
// Copyright 2015 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Transport code's client connection pooling.
|
|
|
|
package http2
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"net/http"
|
|
"sync"
|
|
)
|
|
|
|
// ClientConnPool manages a pool of HTTP/2 client connections.
|
|
type ClientConnPool interface {
|
|
// GetClientConn returns a specific HTTP/2 connection (usually
|
|
// a TLS-TCP connection) to an HTTP/2 server. On success, the
|
|
// returned ClientConn accounts for the upcoming RoundTrip
|
|
// call, so the caller should not omit it. If the caller needs
|
|
// to, ClientConn.RoundTrip can be called with a bogus
|
|
// new(http.Request) to release the stream reservation.
|
|
GetClientConn(req *http.Request, addr string) (*ClientConn, error)
|
|
MarkDead(*ClientConn)
|
|
}
|
|
|
|
// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
|
|
// implementations which can close their idle connections.
|
|
type clientConnPoolIdleCloser interface {
|
|
ClientConnPool
|
|
closeIdleConnections()
|
|
}
|
|
|
|
var (
|
|
_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
|
|
_ clientConnPoolIdleCloser = noDialClientConnPool{}
|
|
)
|
|
|
|
// TODO: use singleflight for dialing and addConnCalls?
|
|
type clientConnPool struct {
|
|
t *Transport
|
|
|
|
mu sync.Mutex // TODO: maybe switch to RWMutex
|
|
// TODO: add support for sharing conns based on cert names
|
|
// (e.g. share conn for googleapis.com and appspot.com)
|
|
conns map[string][]*ClientConn // key is host:port
|
|
dialing map[string]*dialCall // currently in-flight dials
|
|
keys map[*ClientConn][]string
|
|
addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
|
|
}
|
|
|
|
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
return p.getClientConn(req, addr, dialOnMiss)
|
|
}
|
|
|
|
const (
|
|
dialOnMiss = true
|
|
noDialOnMiss = false
|
|
)
|
|
|
|
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
|
|
// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
|
|
if isConnectionCloseRequest(req) && dialOnMiss {
|
|
// It gets its own connection.
|
|
traceGetConn(req, addr)
|
|
const singleUse = true
|
|
cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cc, nil
|
|
}
|
|
for {
|
|
p.mu.Lock()
|
|
for _, cc := range p.conns[addr] {
|
|
if cc.ReserveNewRequest() {
|
|
// When a connection is presented to us by the net/http package,
|
|
// the GetConn hook has already been called.
|
|
// Don't call it a second time here.
|
|
if !cc.getConnCalled {
|
|
traceGetConn(req, addr)
|
|
}
|
|
cc.getConnCalled = false
|
|
p.mu.Unlock()
|
|
return cc, nil
|
|
}
|
|
}
|
|
if !dialOnMiss {
|
|
p.mu.Unlock()
|
|
return nil, ErrNoCachedConn
|
|
}
|
|
traceGetConn(req, addr)
|
|
call := p.getStartDialLocked(req.Context(), addr)
|
|
p.mu.Unlock()
|
|
<-call.done
|
|
if shouldRetryDial(call, req) {
|
|
continue
|
|
}
|
|
cc, err := call.res, call.err
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cc.ReserveNewRequest() {
|
|
return cc, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// dialCall is an in-flight Transport dial call to a host.
|
|
type dialCall struct {
|
|
_ incomparable
|
|
p *clientConnPool
|
|
// the context associated with the request
|
|
// that created this dialCall
|
|
ctx context.Context
|
|
done chan struct{} // closed when done
|
|
res *ClientConn // valid after done is closed
|
|
err error // valid after done is closed
|
|
}
|
|
|
|
// requires p.mu is held.
|
|
func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
|
|
if call, ok := p.dialing[addr]; ok {
|
|
// A dial is already in-flight. Don't start another.
|
|
return call
|
|
}
|
|
call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
|
|
if p.dialing == nil {
|
|
p.dialing = make(map[string]*dialCall)
|
|
}
|
|
p.dialing[addr] = call
|
|
go call.dial(call.ctx, addr)
|
|
return call
|
|
}
|
|
|
|
// run in its own goroutine.
|
|
func (c *dialCall) dial(ctx context.Context, addr string) {
|
|
const singleUse = false // shared conn
|
|
c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
|
|
|
|
c.p.mu.Lock()
|
|
delete(c.p.dialing, addr)
|
|
if c.err == nil {
|
|
c.p.addConnLocked(addr, c.res)
|
|
}
|
|
c.p.mu.Unlock()
|
|
|
|
close(c.done)
|
|
}
|
|
|
|
// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
|
|
// already exist. It coalesces concurrent calls with the same key.
|
|
// This is used by the http1 Transport code when it creates a new connection. Because
|
|
// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
|
|
// the protocol), it can get into a situation where it has multiple TLS connections.
|
|
// This code decides which ones live or die.
|
|
// The return value used is whether c was used.
|
|
// c is never closed.
|
|
func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
|
|
p.mu.Lock()
|
|
for _, cc := range p.conns[key] {
|
|
if cc.CanTakeNewRequest() {
|
|
p.mu.Unlock()
|
|
return false, nil
|
|
}
|
|
}
|
|
call, dup := p.addConnCalls[key]
|
|
if !dup {
|
|
if p.addConnCalls == nil {
|
|
p.addConnCalls = make(map[string]*addConnCall)
|
|
}
|
|
call = &addConnCall{
|
|
p: p,
|
|
done: make(chan struct{}),
|
|
}
|
|
p.addConnCalls[key] = call
|
|
go call.run(t, key, c)
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
<-call.done
|
|
if call.err != nil {
|
|
return false, call.err
|
|
}
|
|
return !dup, nil
|
|
}
|
|
|
|
type addConnCall struct {
|
|
_ incomparable
|
|
p *clientConnPool
|
|
done chan struct{} // closed when done
|
|
err error
|
|
}
|
|
|
|
func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
|
|
cc, err := t.NewClientConn(tc)
|
|
|
|
p := c.p
|
|
p.mu.Lock()
|
|
if err != nil {
|
|
c.err = err
|
|
} else {
|
|
cc.getConnCalled = true // already called by the net/http package
|
|
p.addConnLocked(key, cc)
|
|
}
|
|
delete(p.addConnCalls, key)
|
|
p.mu.Unlock()
|
|
close(c.done)
|
|
}
|
|
|
|
// p.mu must be held
|
|
func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
|
|
for _, v := range p.conns[key] {
|
|
if v == cc {
|
|
return
|
|
}
|
|
}
|
|
if p.conns == nil {
|
|
p.conns = make(map[string][]*ClientConn)
|
|
}
|
|
if p.keys == nil {
|
|
p.keys = make(map[*ClientConn][]string)
|
|
}
|
|
p.conns[key] = append(p.conns[key], cc)
|
|
p.keys[cc] = append(p.keys[cc], key)
|
|
}
|
|
|
|
func (p *clientConnPool) MarkDead(cc *ClientConn) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
for _, key := range p.keys[cc] {
|
|
vv, ok := p.conns[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
newList := filterOutClientConn(vv, cc)
|
|
if len(newList) > 0 {
|
|
p.conns[key] = newList
|
|
} else {
|
|
delete(p.conns, key)
|
|
}
|
|
}
|
|
delete(p.keys, cc)
|
|
}
|
|
|
|
func (p *clientConnPool) closeIdleConnections() {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
// TODO: don't close a cc if it was just added to the pool
|
|
// milliseconds ago and has never been used. There's currently
|
|
// a small race window with the HTTP/1 Transport's integration
|
|
// where it can add an idle conn just before using it, and
|
|
// somebody else can concurrently call CloseIdleConns and
|
|
// break some caller's RoundTrip.
|
|
for _, vv := range p.conns {
|
|
for _, cc := range vv {
|
|
cc.closeIfIdle()
|
|
}
|
|
}
|
|
}
|
|
|
|
func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
|
|
out := in[:0]
|
|
for _, v := range in {
|
|
if v != exclude {
|
|
out = append(out, v)
|
|
}
|
|
}
|
|
// If we filtered it out, zero out the last item to prevent
|
|
// the GC from seeing it.
|
|
if len(in) != len(out) {
|
|
in[len(in)-1] = nil
|
|
}
|
|
return out
|
|
}
|
|
|
|
// noDialClientConnPool is an implementation of http2.ClientConnPool
|
|
// which never dials. We let the HTTP/1.1 client dial and use its TLS
|
|
// connection instead.
|
|
type noDialClientConnPool struct{ *clientConnPool }
|
|
|
|
func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
return p.getClientConn(req, addr, noDialOnMiss)
|
|
}
|
|
|
|
// shouldRetryDial reports whether the current request should
|
|
// retry dialing after the call finished unsuccessfully, for example
|
|
// if the dial was canceled because of a context cancellation or
|
|
// deadline expiry.
|
|
func shouldRetryDial(call *dialCall, req *http.Request) bool {
|
|
if call.err == nil {
|
|
// No error, no need to retry
|
|
return false
|
|
}
|
|
if call.ctx == req.Context() {
|
|
// If the call has the same context as the request, the dial
|
|
// should not be retried, since any cancellation will have come
|
|
// from this request.
|
|
return false
|
|
}
|
|
if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
|
|
// If the call error is not because of a context cancellation or a deadline expiry,
|
|
// the dial should not be retried.
|
|
return false
|
|
}
|
|
// Only retry if the error is a context cancellation error or deadline expiry
|
|
// and the context associated with the call was canceled or expired.
|
|
return call.ctx.Err() != nil
|
|
}
|