mirror of
https://github.com/golang/net.git
synced 2026-03-31 10:27:08 +09:00
internal/http3: add CloseIdleConnections support in transport
Currently, our HTTP/3 transport creates a QUIC endpoint that is never closed when dialing a connection for the first time. To prevent lingering resource that is never cleaned up until a program exits, we now close the QUIC endpoint when CloseIdleConnections is called and no connections need the endpoint anymore. For golang/go#70914 For golang/go#77440 Change-Id: I0e4cb131c161216dd8e6abacfc4e57e79be63d7c Reviewed-on: https://go-review.googlesource.com/c/net/+/754741 Reviewed-by: Nicholas Husin <husin@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com>
This commit is contained in:
committed by
Nicholas Husin
parent
7b75446a98
commit
228a67a374
91
internal/http3/main_test.go
Normal file
91
internal/http3/main_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
// Copyright 2026 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package http3_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
v := m.Run()
|
||||
if v == 0 && goroutineLeaked() {
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(v)
|
||||
}
|
||||
|
||||
func runningBenchmarks() bool {
|
||||
for i, arg := range os.Args {
|
||||
if strings.HasPrefix(arg, "-test.bench=") && !strings.HasSuffix(arg, "=") {
|
||||
return true
|
||||
}
|
||||
if arg == "-test.bench" && i < len(os.Args)-1 && os.Args[i+1] != "" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func interestingGoroutines() (gs []string) {
|
||||
buf := make([]byte, 2<<20)
|
||||
buf = buf[:runtime.Stack(buf, true)]
|
||||
for g := range strings.SplitSeq(string(buf), "\n\n") {
|
||||
_, stack, _ := strings.Cut(g, "\n")
|
||||
stack = strings.TrimSpace(stack)
|
||||
if stack == "" ||
|
||||
strings.Contains(stack, "testing.(*M).before.func1") ||
|
||||
strings.Contains(stack, "os/signal.signal_recv") ||
|
||||
strings.Contains(stack, "created by net.startServer") ||
|
||||
strings.Contains(stack, "created by testing.RunTests") ||
|
||||
strings.Contains(stack, "closeWriteAndWait") ||
|
||||
strings.Contains(stack, "testing.Main(") ||
|
||||
// These only show up with GOTRACEBACK=2; Issue 5005 (comment 28)
|
||||
strings.Contains(stack, "runtime.goexit") ||
|
||||
strings.Contains(stack, "created by runtime.gc") ||
|
||||
strings.Contains(stack, "interestingGoroutines") ||
|
||||
strings.Contains(stack, "runtime.MHeap_Scavenger") {
|
||||
continue
|
||||
}
|
||||
gs = append(gs, stack)
|
||||
}
|
||||
slices.Sort(gs)
|
||||
return
|
||||
}
|
||||
|
||||
// Verify the other tests didn't leave any goroutines running.
|
||||
func goroutineLeaked() bool {
|
||||
if testing.Short() || runningBenchmarks() {
|
||||
// Don't worry about goroutine leaks in -short mode or in
|
||||
// benchmark mode. Too distracting when there are false positives.
|
||||
return false
|
||||
}
|
||||
|
||||
var stackCount map[string]int
|
||||
for range 5 {
|
||||
n := 0
|
||||
stackCount = make(map[string]int)
|
||||
gs := interestingGoroutines()
|
||||
for _, g := range gs {
|
||||
stackCount[g]++
|
||||
n++
|
||||
}
|
||||
if n == 0 {
|
||||
return false
|
||||
}
|
||||
// Wait for goroutines to schedule and die off:
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "Too many goroutines running after net/http test(s).\n")
|
||||
for stack, count := range stackCount {
|
||||
fmt.Fprintf(os.Stderr, "%d instances of:\n%s\n", count, stack)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -81,20 +81,37 @@ func TestNetHTTPIntegration(t *testing.T) {
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
<-listenAddrSet
|
||||
req, err := http.NewRequest("GET", "https://"+listenAddr, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
for range 5 {
|
||||
req, err := http.NewRequest("GET", "https://"+listenAddr, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !slices.Equal(b, body) {
|
||||
t.Errorf("got %v, want %v", string(b), string(body))
|
||||
}
|
||||
// TestMain checks that there are no leaked goroutines after tests have
|
||||
// finished running.
|
||||
// Over here, we verify that closing the idle connections of a net/http
|
||||
// Transport will result in HTTP/3 transport closing any UDP sockets
|
||||
// after there are no longer any open connections.
|
||||
// We do this in a loop to verify that CloseIdleConnections will not
|
||||
// prevent transport from creating a new connection should a new dial
|
||||
// be started.
|
||||
tr.CloseIdleConnections()
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
// Similarly when a net/http Server shuts down, the HTTP/3 server should
|
||||
// also follow.
|
||||
if err := srv.Shutdown(t.Context()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !slices.Equal(b, body) {
|
||||
t.Errorf("got %v, want %v", string(b), string(body))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,19 +22,17 @@ import (
|
||||
// TODO: Provide a way to register an HTTP/3 transport with a net/http.transport's
|
||||
// connection pool.
|
||||
type transport struct {
|
||||
// endpoint is the QUIC endpoint used by connections created by the transport.
|
||||
// If unset, it is initialized by the first call to Dial.
|
||||
endpoint *quic.Endpoint
|
||||
|
||||
// config is the QUIC configuration used for client connections.
|
||||
// The config may be nil.
|
||||
//
|
||||
// Dial may clone and modify the config.
|
||||
// The config must not be modified after calling Dial.
|
||||
config *quic.Config
|
||||
|
||||
initOnce sync.Once
|
||||
initErr error
|
||||
mu sync.Mutex // Guards fields below.
|
||||
// endpoint is the QUIC endpoint used by connections created by the
|
||||
// transport. If CloseIdleConnections is called when activeConns is empty,
|
||||
// endpoint will be unset. If unset, endpoint will be initialized by any
|
||||
// call to dial.
|
||||
endpoint *quic.Endpoint
|
||||
activeConns map[*clientConn]struct{}
|
||||
inFlightDials int
|
||||
}
|
||||
|
||||
// netHTTPTransport implements the net/http.dialClientConner interface,
|
||||
@@ -60,33 +58,65 @@ func (t netHTTPTransport) DialClientConn(ctx context.Context, addr string, _ *ur
|
||||
// TODO: most likely, add another arg for transport configuration.
|
||||
func RegisterTransport(tr *http.Transport) {
|
||||
tr3 := &transport{
|
||||
config: &quic.Config{
|
||||
TLSConfig: tr.TLSClientConfig.Clone(),
|
||||
},
|
||||
// initConfig will clone the tr.TLSClientConfig.
|
||||
config: initConfig(&quic.Config{
|
||||
TLSConfig: tr.TLSClientConfig,
|
||||
}),
|
||||
activeConns: make(map[*clientConn]struct{}),
|
||||
}
|
||||
tr.RegisterProtocol("http/3", netHTTPTransport{tr3})
|
||||
}
|
||||
|
||||
func (tr *transport) init() error {
|
||||
tr.initOnce.Do(func() {
|
||||
tr.config = initConfig(tr.config)
|
||||
if tr.endpoint == nil {
|
||||
tr.endpoint, tr.initErr = quic.Listen("udp", ":0", nil)
|
||||
}
|
||||
})
|
||||
return tr.initErr
|
||||
func (tr *transport) incInFlightDials() {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
tr.inFlightDials++
|
||||
}
|
||||
|
||||
func (tr *transport) decInFlightDials() {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
tr.inFlightDials--
|
||||
}
|
||||
|
||||
func (tr *transport) initEndpoint() (err error) {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
if tr.endpoint == nil {
|
||||
tr.endpoint, err = quic.Listen("udp", ":0", nil)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// dial creates a new HTTP/3 client connection.
|
||||
func (tr *transport) dial(ctx context.Context, target string) (*clientConn, error) {
|
||||
if err := tr.init(); err != nil {
|
||||
tr.incInFlightDials()
|
||||
defer tr.decInFlightDials()
|
||||
|
||||
if err := tr.initEndpoint(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qconn, err := tr.endpoint.Dial(ctx, "udp", target, tr.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newClientConn(ctx, qconn)
|
||||
return tr.newClientConn(ctx, qconn)
|
||||
}
|
||||
|
||||
// CloseIdleConnections is called by net/http.Transport.CloseIdleConnections
|
||||
// after all existing idle connections are closed using http3.clientConn.Close.
|
||||
//
|
||||
// When the transport has no active connections anymore, calling this method
|
||||
// will make the transport clean up any shared resources that are no longer
|
||||
// required, such as its QUIC endpoint.
|
||||
func (tr *transport) CloseIdleConnections() {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
if tr.endpoint == nil || len(tr.activeConns) > 0 || tr.inFlightDials > 0 {
|
||||
return
|
||||
}
|
||||
tr.endpoint.Close(canceledCtx)
|
||||
tr.endpoint = nil
|
||||
}
|
||||
|
||||
// A clientConn is a client HTTP/3 connection.
|
||||
@@ -100,33 +130,69 @@ type clientConn struct {
|
||||
dec qpackDecoder
|
||||
}
|
||||
|
||||
func newClientConn(ctx context.Context, qconn *quic.Conn) (*clientConn, error) {
|
||||
func (tr *transport) registerConn(cc *clientConn) {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
tr.activeConns[cc] = struct{}{}
|
||||
}
|
||||
|
||||
func (tr *transport) unregisterConn(cc *clientConn) {
|
||||
tr.mu.Lock()
|
||||
defer tr.mu.Unlock()
|
||||
delete(tr.activeConns, cc)
|
||||
}
|
||||
|
||||
func (tr *transport) newClientConn(ctx context.Context, qconn *quic.Conn) (*clientConn, error) {
|
||||
cc := &clientConn{
|
||||
qconn: qconn,
|
||||
}
|
||||
tr.registerConn(cc)
|
||||
cc.enc.init()
|
||||
|
||||
// Create control stream and send SETTINGS frame.
|
||||
controlStream, err := newConnStream(ctx, cc.qconn, streamTypeControl)
|
||||
if err != nil {
|
||||
tr.unregisterConn(cc)
|
||||
return nil, fmt.Errorf("http3: cannot create control stream: %v", err)
|
||||
}
|
||||
controlStream.writeSettings()
|
||||
controlStream.Flush()
|
||||
|
||||
go cc.acceptStreams(qconn, cc)
|
||||
go func() {
|
||||
cc.acceptStreams(qconn, cc)
|
||||
tr.unregisterConn(cc)
|
||||
}()
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// close closes the connection.
|
||||
// Any in-flight requests are canceled.
|
||||
// close does not wait for the peer to acknowledge the connection closing.
|
||||
func (cc *clientConn) close() error {
|
||||
// Close the QUIC connection immediately with a status of NO_ERROR.
|
||||
cc.qconn.Abort(nil)
|
||||
// TODO: implement the rest of net/http.ClientConn methods beyond Close.
|
||||
func (cc *clientConn) Close() error {
|
||||
// We need to use Close rather than Abort on the QUIC connection.
|
||||
// Otherwise, when a net/http.Transport.CloseIdleConnections is called, it
|
||||
// might call the http3.transport.CloseIdleConnections prior to all idle
|
||||
// connections being fully closed; this would make it unable to close its
|
||||
// QUIC endpoint, making http3.transport.CloseIdleConnections a no-op
|
||||
// unintentionally.
|
||||
return cc.qconn.Close()
|
||||
}
|
||||
|
||||
// Return any existing error from the peer, but don't wait for it.
|
||||
return cc.qconn.Wait(canceledCtx)
|
||||
func (cc *clientConn) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cc *clientConn) Reserve() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cc *clientConn) Release() {
|
||||
}
|
||||
|
||||
func (cc *clientConn) Available() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (cc *clientConn) InFlight() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (cc *clientConn) handleControlStream(st *stream) error {
|
||||
|
||||
@@ -417,6 +417,7 @@ func newTestClientConn(t testing.TB) *testClientConn {
|
||||
config: &quic.Config{
|
||||
TLSConfig: testTLSConfig,
|
||||
},
|
||||
activeConns: make(map[*clientConn]struct{}),
|
||||
}
|
||||
|
||||
cc, err := tr.dial(t.Context(), e2.LocalAddr().String())
|
||||
@@ -424,7 +425,7 @@ func newTestClientConn(t testing.TB) *testClientConn {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
cc.close()
|
||||
cc.Close()
|
||||
})
|
||||
srvConn, err := e2.Accept(t.Context())
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user