http2: use testing/synctest

Replace ad-hoc pre-synctest synchronization with testing/synctest.

Many of the http2 package tests use a custom-built fake time
implementation and a idleness-detection mechanism based on parsing
goroutine stack dumps. Experience with this approach to testing
eventually led to the development of the testing/synctest package.
Switch over to testing/synctest.

The synctest package became available as an experiment in Go 1.24
(only when GOEXPERIMENT=synctest is set), and was fully released
with some API changes in Go 1.25.

- Use the released synctest API on Go 1.25.

- Use the experimental API (synctest.Run) on Go 1.24 when
  GOEXPERIMENT=synctest is set. (Note that we set this on trybots.)

- Skip tests on Go 1.24 when GOEXPERIMENT=synctest is not set.

The x/net module requires go1.24, so older versions
can be disregarded.

Change-Id: Ifc13bfdd9bdada6c016730a78bd5972a5193ee30
Reviewed-on: https://go-review.googlesource.com/c/net/+/700996
Reviewed-by: Nicholas Husin <husin@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <nsh@golang.org>
This commit is contained in:
Damien Neil
2025-09-02 10:21:42 -07:00
committed by Gopher Robot
parent b800b2045a
commit 7c51e1fbce
16 changed files with 785 additions and 798 deletions

View File

@@ -5,6 +5,8 @@
// Infrastructure for testing ClientConn.RoundTrip.
// Put actual tests in transport_test.go.
//go:build go1.25 || goexperiment.synctest
package http2
import (
@@ -17,6 +19,7 @@ import (
"reflect"
"sync/atomic"
"testing"
"testing/synctest"
"time"
"golang.org/x/net/http2/hpack"
@@ -24,7 +27,8 @@ import (
)
// TestTestClientConn demonstrates usage of testClientConn.
func TestTestClientConn(t *testing.T) {
func TestTestClientConn(t *testing.T) { synctestTest(t, testTestClientConn) }
func testTestClientConn(t testing.TB) {
// newTestClientConn creates a *ClientConn and surrounding test infrastructure.
tc := newTestClientConn(t)
@@ -91,12 +95,11 @@ func TestTestClientConn(t *testing.T) {
// testClientConn manages synchronization, so tests can generally be written as
// a linear sequence of actions and validations without additional synchronization.
type testClientConn struct {
t *testing.T
t testing.TB
tr *Transport
fr *Framer
cc *ClientConn
group *synctestGroup
tr *Transport
fr *Framer
cc *ClientConn
testConnFramer
encbuf bytes.Buffer
@@ -107,12 +110,11 @@ type testClientConn struct {
netconn *synctestNetConn
}
func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientConn {
func newTestClientConnFromClientConn(t testing.TB, cc *ClientConn) *testClientConn {
tc := &testClientConn{
t: t,
tr: cc.t,
cc: cc,
group: cc.t.transportTestHooks.group.(*synctestGroup),
t: t,
tr: cc.t,
cc: cc,
}
// srv is the side controlled by the test.
@@ -121,7 +123,7 @@ func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientCo
// If cc.tconn is nil, we're being called with a new conn created by the
// Transport's client pool. This path skips dialing the server, and we
// create a test connection pair here.
cc.tconn, srv = synctestNetPipe(tc.group)
cc.tconn, srv = synctestNetPipe()
} else {
// If cc.tconn is non-nil, we're in a test which provides a conn to the
// Transport via a TLSNextProto hook. Extract the test connection pair.
@@ -133,7 +135,7 @@ func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientCo
srv = cc.tconn.(*synctestNetConn).peer
}
srv.SetReadDeadline(tc.group.Now())
srv.SetReadDeadline(time.Now())
srv.autoWait = true
tc.netconn = srv
tc.enc = hpack.NewEncoder(&tc.encbuf)
@@ -163,7 +165,7 @@ func (tc *testClientConn) readClientPreface() {
}
}
func newTestClientConn(t *testing.T, opts ...any) *testClientConn {
func newTestClientConn(t testing.TB, opts ...any) *testClientConn {
t.Helper()
tt := newTestTransport(t, opts...)
@@ -176,18 +178,6 @@ func newTestClientConn(t *testing.T, opts ...any) *testClientConn {
return tt.getConn()
}
// sync waits for the ClientConn under test to reach a stable state,
// with all goroutines blocked on some input.
func (tc *testClientConn) sync() {
tc.group.Wait()
}
// advance advances synthetic time by a duration.
func (tc *testClientConn) advance(d time.Duration) {
tc.group.AdvanceTime(d)
tc.sync()
}
// hasFrame reports whether a frame is available to be read.
func (tc *testClientConn) hasFrame() bool {
return len(tc.netconn.Peek()) > 0
@@ -258,17 +248,17 @@ func (b *testRequestBody) Close() error {
// writeBytes adds n arbitrary bytes to the body.
func (b *testRequestBody) writeBytes(n int) {
defer b.tc.sync()
defer synctest.Wait()
b.gate.Lock()
defer b.unlock()
b.bytes += n
b.checkWrite()
b.tc.sync()
synctest.Wait()
}
// Write adds bytes to the body.
func (b *testRequestBody) Write(p []byte) (int, error) {
defer b.tc.sync()
defer synctest.Wait()
b.gate.Lock()
defer b.unlock()
n, err := b.buf.Write(p)
@@ -287,7 +277,7 @@ func (b *testRequestBody) checkWrite() {
// closeWithError sets an error which will be returned by Read.
func (b *testRequestBody) closeWithError(err error) {
defer b.tc.sync()
defer synctest.Wait()
b.gate.Lock()
defer b.unlock()
b.err = err
@@ -304,13 +294,12 @@ func (tc *testClientConn) roundTrip(req *http.Request) *testRoundTrip {
}
tc.roundtrips = append(tc.roundtrips, rt)
go func() {
tc.group.Join()
defer close(rt.donec)
rt.resp, rt.respErr = tc.cc.roundTrip(req, func(cs *clientStream) {
rt.id.Store(cs.ID)
})
}()
tc.sync()
synctest.Wait()
tc.t.Cleanup(func() {
if !rt.done() {
@@ -366,7 +355,7 @@ func (tc *testClientConn) inflowWindow(streamID uint32) int32 {
// testRoundTrip manages a RoundTrip in progress.
type testRoundTrip struct {
t *testing.T
t testing.TB
resp *http.Response
respErr error
donec chan struct{}
@@ -396,6 +385,7 @@ func (rt *testRoundTrip) done() bool {
func (rt *testRoundTrip) result() (*http.Response, error) {
t := rt.t
t.Helper()
synctest.Wait()
select {
case <-rt.donec:
default:
@@ -494,19 +484,16 @@ func diffHeaders(got, want http.Header) string {
// Tests that aren't specifically exercising RoundTrip's retry loop or connection pooling
// should use testClientConn instead.
type testTransport struct {
t *testing.T
tr *Transport
group *synctestGroup
t testing.TB
tr *Transport
ccs []*testClientConn
}
func newTestTransport(t *testing.T, opts ...any) *testTransport {
func newTestTransport(t testing.TB, opts ...any) *testTransport {
tt := &testTransport{
t: t,
group: newSynctest(time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)),
t: t,
}
tt.group.Join()
tr := &Transport{}
for _, o := range opts {
@@ -525,7 +512,6 @@ func newTestTransport(t *testing.T, opts ...any) *testTransport {
tt.tr = tr
tr.transportTestHooks = &transportTestHooks{
group: tt.group,
newclientconn: func(cc *ClientConn) {
tc := newTestClientConnFromClientConn(t, cc)
tt.ccs = append(tt.ccs, tc)
@@ -533,25 +519,15 @@ func newTestTransport(t *testing.T, opts ...any) *testTransport {
}
t.Cleanup(func() {
tt.sync()
synctest.Wait()
if len(tt.ccs) > 0 {
t.Fatalf("%v test ClientConns created, but not examined by test", len(tt.ccs))
}
tt.group.Close(t)
})
return tt
}
func (tt *testTransport) sync() {
tt.group.Wait()
}
func (tt *testTransport) advance(d time.Duration) {
tt.group.AdvanceTime(d)
tt.sync()
}
func (tt *testTransport) hasConn() bool {
return len(tt.ccs) > 0
}
@@ -563,9 +539,9 @@ func (tt *testTransport) getConn() *testClientConn {
}
tc := tt.ccs[0]
tt.ccs = tt.ccs[1:]
tc.sync()
synctest.Wait()
tc.readClientPreface()
tc.sync()
synctest.Wait()
return tc
}
@@ -575,11 +551,10 @@ func (tt *testTransport) roundTrip(req *http.Request) *testRoundTrip {
donec: make(chan struct{}),
}
go func() {
tt.group.Join()
defer close(rt.donec)
rt.resp, rt.respErr = tt.tr.RoundTrip(req)
}()
tt.sync()
synctest.Wait()
tt.t.Cleanup(func() {
if !rt.done() {

View File

@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.24
//go:build go1.25 || goexperiment.synctest
package http2
@@ -12,7 +12,8 @@ import (
"time"
)
func TestConfigServerSettings(t *testing.T) {
func TestConfigServerSettings(t *testing.T) { synctestTest(t, testConfigServerSettings) }
func testConfigServerSettings(t testing.TB) {
config := &http.HTTP2Config{
MaxConcurrentStreams: 1,
MaxDecoderHeaderTableSize: 1<<20 + 2,
@@ -37,7 +38,8 @@ func TestConfigServerSettings(t *testing.T) {
})
}
func TestConfigTransportSettings(t *testing.T) {
func TestConfigTransportSettings(t *testing.T) { synctestTest(t, testConfigTransportSettings) }
func testConfigTransportSettings(t testing.TB) {
config := &http.HTTP2Config{
MaxConcurrentStreams: 1, // ignored by Transport
MaxDecoderHeaderTableSize: 1<<20 + 2,
@@ -60,7 +62,8 @@ func TestConfigTransportSettings(t *testing.T) {
tc.wantWindowUpdate(0, uint32(config.MaxReceiveBufferPerConnection))
}
func TestConfigPingTimeoutServer(t *testing.T) {
func TestConfigPingTimeoutServer(t *testing.T) { synctestTest(t, testConfigPingTimeoutServer) }
func testConfigPingTimeoutServer(t testing.TB) {
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
}, func(s *Server) {
s.ReadIdleTimeout = 2 * time.Second
@@ -68,13 +71,14 @@ func TestConfigPingTimeoutServer(t *testing.T) {
})
st.greet()
st.advance(2 * time.Second)
time.Sleep(2 * time.Second)
_ = readFrame[*PingFrame](t, st)
st.advance(3 * time.Second)
time.Sleep(3 * time.Second)
st.wantClosed()
}
func TestConfigPingTimeoutTransport(t *testing.T) {
func TestConfigPingTimeoutTransport(t *testing.T) { synctestTest(t, testConfigPingTimeoutTransport) }
func testConfigPingTimeoutTransport(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.ReadIdleTimeout = 2 * time.Second
tr.PingTimeout = 3 * time.Second
@@ -85,9 +89,9 @@ func TestConfigPingTimeoutTransport(t *testing.T) {
rt := tc.roundTrip(req)
tc.wantFrameType(FrameHeaders)
tc.advance(2 * time.Second)
time.Sleep(2 * time.Second)
tc.wantFrameType(FramePing)
tc.advance(3 * time.Second)
time.Sleep(3 * time.Second)
err := rt.err()
if err == nil {
t.Fatalf("expected connection to close")

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (

View File

@@ -15,7 +15,6 @@ package http2 // import "golang.org/x/net/http2"
import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
@@ -255,15 +254,13 @@ func (cw closeWaiter) Wait() {
// idle memory usage with many connections.
type bufferedWriter struct {
_ incomparable
group synctestGroupInterface // immutable
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
conn net.Conn // immutable
bw *bufio.Writer // non-nil when data is buffered
byteTimeout time.Duration // immutable, WriteByteTimeout
}
func newBufferedWriter(group synctestGroupInterface, conn net.Conn, timeout time.Duration) *bufferedWriter {
func newBufferedWriter(conn net.Conn, timeout time.Duration) *bufferedWriter {
return &bufferedWriter{
group: group,
conn: conn,
byteTimeout: timeout,
}
@@ -314,24 +311,18 @@ func (w *bufferedWriter) Flush() error {
type bufferedWriterTimeoutWriter bufferedWriter
func (w *bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) {
return writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p)
return writeWithByteTimeout(w.conn, w.byteTimeout, p)
}
// writeWithByteTimeout writes to conn.
// If more than timeout passes without any bytes being written to the connection,
// the write fails.
func writeWithByteTimeout(group synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
func writeWithByteTimeout(conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
if timeout <= 0 {
return conn.Write(p)
}
for {
var now time.Time
if group == nil {
now = time.Now()
} else {
now = group.Now()
}
conn.SetWriteDeadline(now.Add(timeout))
conn.SetWriteDeadline(time.Now().Add(timeout))
nn, err := conn.Write(p[n:])
n += nn
if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) {
@@ -417,14 +408,3 @@ func (s *sorter) SortStrings(ss []string) {
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()
// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
// It's defined as an interface here to let us keep synctestGroup entirely test-only
// and not a part of non-test builds.
type synctestGroupInterface interface {
Join()
Now() time.Time
NewTimer(d time.Duration) timer
AfterFunc(d time.Duration, f func()) timer
ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
}

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (
@@ -68,7 +70,7 @@ func (w twriter) Write(p []byte) (n int, err error) {
}
// like encodeHeader, but don't add implicit pseudo headers.
func encodeHeaderNoImplicit(t *testing.T, headers ...string) []byte {
func encodeHeaderNoImplicit(t testing.TB, headers ...string) []byte {
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
for len(headers) > 0 {
@@ -300,3 +302,11 @@ func must[T any](v T, err error) T {
}
return v
}
// synctestSubtest starts a subtest and runs f in a synctest bubble within it.
func synctestSubtest(t *testing.T, name string, f func(testing.TB)) {
t.Helper()
t.Run(name, func(t *testing.T) {
synctestTest(t, f)
})
}

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (
@@ -14,6 +16,7 @@ import (
"net/netip"
"os"
"sync"
"testing/synctest"
"time"
)
@@ -23,13 +26,13 @@ import (
// Unlike net.Pipe, the connection is not synchronous.
// Writes are made to a buffer, and return immediately.
// By default, the buffer size is unlimited.
func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
func synctestNetPipe() (r, w *synctestNetConn) {
s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8000"))
s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
s1 := newSynctestNetConnHalf(s1addr)
s2 := newSynctestNetConnHalf(s2addr)
r = &synctestNetConn{group: group, loc: s1, rem: s2}
w = &synctestNetConn{group: group, loc: s2, rem: s1}
r = &synctestNetConn{loc: s1, rem: s2}
w = &synctestNetConn{loc: s2, rem: s1}
r.peer = w
w.peer = r
return r, w
@@ -37,8 +40,6 @@ func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
// A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
type synctestNetConn struct {
group *synctestGroup
// local and remote connection halves.
// Each half contains a buffer.
// Reads pull from the local buffer, and writes push to the remote buffer.
@@ -54,7 +55,7 @@ type synctestNetConn struct {
// Read reads data from the connection.
func (c *synctestNetConn) Read(b []byte) (n int, err error) {
if c.autoWait {
c.group.Wait()
synctest.Wait()
}
return c.loc.read(b)
}
@@ -63,7 +64,7 @@ func (c *synctestNetConn) Read(b []byte) (n int, err error) {
// without consuming its contents.
func (c *synctestNetConn) Peek() []byte {
if c.autoWait {
c.group.Wait()
synctest.Wait()
}
return c.loc.peek()
}
@@ -71,7 +72,7 @@ func (c *synctestNetConn) Peek() []byte {
// Write writes data to the connection.
func (c *synctestNetConn) Write(b []byte) (n int, err error) {
if c.autoWait {
defer c.group.Wait()
defer synctest.Wait()
}
return c.rem.write(b)
}
@@ -79,7 +80,7 @@ func (c *synctestNetConn) Write(b []byte) (n int, err error) {
// IsClosedByPeer reports whether the peer has closed its end of the connection.
func (c *synctestNetConn) IsClosedByPeer() bool {
if c.autoWait {
c.group.Wait()
synctest.Wait()
}
return c.loc.isClosedByPeer()
}
@@ -89,7 +90,7 @@ func (c *synctestNetConn) Close() error {
c.loc.setWriteError(errors.New("connection closed by peer"))
c.rem.setReadError(io.EOF)
if c.autoWait {
c.group.Wait()
synctest.Wait()
}
return nil
}
@@ -113,13 +114,13 @@ func (c *synctestNetConn) SetDeadline(t time.Time) error {
// SetReadDeadline sets the read deadline for the connection.
func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
c.loc.rctx.setDeadline(c.group, t)
c.loc.rctx.setDeadline(t)
return nil
}
// SetWriteDeadline sets the write deadline for the connection.
func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
c.rem.wctx.setDeadline(c.group, t)
c.rem.wctx.setDeadline(t)
return nil
}
@@ -305,7 +306,7 @@ type deadlineContext struct {
mu sync.Mutex
ctx context.Context
cancel context.CancelCauseFunc
timer timer
timer *time.Timer
}
// context returns a Context which expires when the deadline does.
@@ -319,7 +320,7 @@ func (t *deadlineContext) context() context.Context {
}
// setDeadline sets the current deadline.
func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time) {
func (t *deadlineContext) setDeadline(deadline time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
@@ -335,7 +336,7 @@ func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time)
// No deadline.
return
}
if !deadline.After(group.Now()) {
if !deadline.After(time.Now()) {
// Deadline has already expired.
t.cancel(os.ErrDeadlineExceeded)
t.cancel = nil
@@ -343,11 +344,11 @@ func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time)
}
if t.timer != nil {
// Reuse existing deadline timer.
t.timer.Reset(deadline.Sub(group.Now()))
t.timer.Reset(deadline.Sub(time.Now()))
return
}
// Create a new timer to cancel the context at the deadline.
t.timer = group.AfterFunc(deadline.Sub(group.Now()), func() {
t.timer = time.AfterFunc(deadline.Sub(time.Now()), func() {
t.mu.Lock()
defer t.mu.Unlock()
t.cancel(os.ErrDeadlineExceeded)

View File

@@ -176,39 +176,6 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState
// Synchronization group used for testing.
// Outside of tests, this is nil.
group synctestGroupInterface
}
func (s *Server) markNewGoroutine() {
if s.group != nil {
s.group.Join()
}
}
func (s *Server) now() time.Time {
if s.group != nil {
return s.group.Now()
}
return time.Now()
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (s *Server) newTimer(d time.Duration) timer {
if s.group != nil {
return s.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (s *Server) afterFunc(d time.Duration, f func()) timer {
if s.group != nil {
return s.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
type serverInternalState struct {
@@ -438,7 +405,7 @@ func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverCon
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
bw: newBufferedWriter(s.group, c, conf.WriteByteTimeout),
bw: newBufferedWriter(c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
@@ -638,11 +605,11 @@ type serverConn struct {
pingSent bool
sentPingData [8]byte
goAwayCode ErrCode
shutdownTimer timer // nil until used
idleTimer timer // nil if unused
shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
readIdleTimeout time.Duration
pingTimeout time.Duration
readIdleTimer timer // nil if unused
readIdleTimer *time.Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -687,12 +654,12 @@ type stream struct {
flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline timer // nil if unused
writeDeadline timer // nil if unused
closeErr error // set before cw is closed
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline *time.Timer // nil if unused
writeDeadline *time.Timer // nil if unused
closeErr error // set before cw is closed
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -848,7 +815,6 @@ type readFrameResult struct {
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
sc.srv.markNewGoroutine()
gate := make(chan struct{})
gateDone := func() { gate <- struct{}{} }
for {
@@ -881,7 +847,6 @@ type frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
sc.srv.markNewGoroutine()
var err error
if wd == nil {
err = wr.write.writeFrame(sc)
@@ -965,22 +930,22 @@ func (sc *serverConn) serve(conf http2Config) {
sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout > 0 {
sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
if conf.SendPingTimeout > 0 {
sc.readIdleTimeout = conf.SendPingTimeout
sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
sc.readIdleTimer = time.AfterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
defer sc.readIdleTimer.Stop()
}
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
lastFrameTime := sc.srv.now()
lastFrameTime := time.Now()
loopNum := 0
for {
loopNum++
@@ -994,7 +959,7 @@ func (sc *serverConn) serve(conf http2Config) {
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
lastFrameTime = sc.srv.now()
lastFrameTime = time.Now()
// Process any written frames before reading new frames from the client since a
// written frame could have triggered a new stream to be started.
if sc.writingFrameAsync {
@@ -1077,7 +1042,7 @@ func (sc *serverConn) handlePingTimer(lastFrameReadTime time.Time) {
}
pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
now := sc.srv.now()
now := time.Now()
if pingAt.After(now) {
// We received frames since arming the ping timer.
// Reset it for the next possible timeout.
@@ -1141,10 +1106,10 @@ func (sc *serverConn) readPreface() error {
errc <- nil
}
}()
timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C():
case <-timer.C:
return errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -1160,6 +1125,21 @@ var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
func getErrChan() chan error {
if inTests {
// Channels cannot be reused across synctest tests.
return make(chan error, 1)
} else {
return errChanPool.Get().(chan error)
}
}
func putErrChan(ch chan error) {
if !inTests {
errChanPool.Put(ch)
}
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
@@ -1167,7 +1147,7 @@ var writeDataPool = sync.Pool{
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
ch := errChanPool.Get().(chan error)
ch := getErrChan()
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
err := sc.writeFrameFromHandler(FrameWriteRequest{
@@ -1199,7 +1179,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return errStreamClosed
}
}
errChanPool.Put(ch)
putErrChan(ch)
if frameWriteDone {
writeDataPool.Put(writeArg)
}
@@ -1513,7 +1493,7 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
}
func (sc *serverConn) resetStream(se StreamError) {
@@ -2118,7 +2098,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
return sc.scheduleHandler(id, rw, req, handler)
@@ -2216,7 +2196,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.initialStreamRecvWindowSize)
if sc.hs.WriteTimeout > 0 {
st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -2405,7 +2385,6 @@ func (sc *serverConn) handlerDone() {
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
sc.srv.markNewGoroutine()
defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
@@ -2454,7 +2433,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = errChanPool.Get().(chan error)
errc = getErrChan()
}
if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
@@ -2466,7 +2445,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
if errc != nil {
select {
case err := <-errc:
errChanPool.Put(errc)
putErrChan(errc)
return err
case <-sc.doneServing:
return errClientDisconnected
@@ -2702,7 +2681,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
date = time.Now().UTC().Format(http.TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -2824,7 +2803,7 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onReadTimeout()
@@ -2840,9 +2819,9 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.readDeadline = nil
} else if st.readDeadline == nil {
st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
} else {
st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
st.readDeadline.Reset(deadline.Sub(time.Now()))
}
})
return nil
@@ -2850,7 +2829,7 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream
if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout()
@@ -2866,9 +2845,9 @@ func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.writeDeadline = nil
} else if st.writeDeadline == nil {
st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
} else {
st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
st.writeDeadline.Reset(deadline.Sub(time.Now()))
}
})
return nil
@@ -3147,7 +3126,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
method: opts.Method,
url: u,
header: cloneHeader(opts.Header),
done: errChanPool.Get().(chan error),
done: getErrChan(),
}
select {
@@ -3164,7 +3143,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
case <-st.cw:
return errStreamClosed
case err := <-msg.done:
errChanPool.Put(msg.done)
putErrChan(msg.done)
return err
}
}

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (
@@ -17,7 +19,8 @@ import (
"time"
)
func TestServer_Push_Success(t *testing.T) {
func TestServer_Push_Success(t *testing.T) { synctestTest(t, testServer_Push_Success) }
func testServer_Push_Success(t testing.TB) {
const (
mainBody = "<html>index page</html>"
pushedBody = "<html>pushed page</html>"
@@ -242,7 +245,8 @@ func TestServer_Push_Success(t *testing.T) {
}
}
func TestServer_Push_SuccessNoRace(t *testing.T) {
func TestServer_Push_SuccessNoRace(t *testing.T) { synctestTest(t, testServer_Push_SuccessNoRace) }
func testServer_Push_SuccessNoRace(t testing.TB) {
// Regression test for issue #18326. Ensure the request handler can mutate
// pushed request headers without racing with the PUSH_PROMISE write.
errc := make(chan error, 2)
@@ -287,6 +291,9 @@ func TestServer_Push_SuccessNoRace(t *testing.T) {
}
func TestServer_Push_RejectRecursivePush(t *testing.T) {
synctestTest(t, testServer_Push_RejectRecursivePush)
}
func testServer_Push_RejectRecursivePush(t testing.TB) {
// Expect two requests, but might get three if there's a bug and the second push succeeds.
errc := make(chan error, 3)
handler := func(w http.ResponseWriter, r *http.Request) error {
@@ -323,6 +330,11 @@ func TestServer_Push_RejectRecursivePush(t *testing.T) {
}
func testServer_Push_RejectSingleRequest(t *testing.T, doPush func(http.Pusher, *http.Request) error, settings ...Setting) {
synctestTest(t, func(t testing.TB) {
testServer_Push_RejectSingleRequest_Bubble(t, doPush, settings...)
})
}
func testServer_Push_RejectSingleRequest_Bubble(t testing.TB, doPush func(http.Pusher, *http.Request) error, settings ...Setting) {
// Expect one request, but might get two if there's a bug and the push succeeds.
errc := make(chan error, 2)
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
@@ -426,6 +438,9 @@ func TestServer_Push_RejectForbiddenHeader(t *testing.T) {
}
func TestServer_Push_StateTransitions(t *testing.T) {
synctestTest(t, testServer_Push_StateTransitions)
}
func testServer_Push_StateTransitions(t testing.TB) {
const body = "foo"
gotPromise := make(chan bool)
@@ -479,6 +494,9 @@ func TestServer_Push_StateTransitions(t *testing.T) {
}
func TestServer_Push_RejectAfterGoAway(t *testing.T) {
synctestTest(t, testServer_Push_RejectAfterGoAway)
}
func testServer_Push_RejectAfterGoAway(t testing.TB) {
var readyOnce sync.Once
ready := make(chan struct{})
errc := make(chan error, 2)
@@ -518,7 +536,8 @@ func TestServer_Push_RejectAfterGoAway(t *testing.T) {
}
}
func TestServer_Push_Underflow(t *testing.T) {
func TestServer_Push_Underflow(t *testing.T) { synctestTest(t, testServer_Push_Underflow) }
func testServer_Push_Underflow(t testing.TB) {
// Test for #63511: Send several requests which generate PUSH_PROMISE responses,
// verify they all complete successfully.
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {

File diff suppressed because it is too large Load Diff

View File

@@ -1,329 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
)
// A synctestGroup synchronizes between a set of cooperating goroutines.
type synctestGroup struct {
mu sync.Mutex
gids map[int]bool
now time.Time
timers map[*fakeTimer]struct{}
}
type goroutine struct {
id int
parent int
state string
syscall bool
}
// newSynctest creates a new group with the synthetic clock set the provided time.
func newSynctest(now time.Time) *synctestGroup {
return &synctestGroup{
gids: map[int]bool{
currentGoroutine(): true,
},
now: now,
}
}
// Join adds the current goroutine to the group.
func (g *synctestGroup) Join() {
g.mu.Lock()
defer g.mu.Unlock()
g.gids[currentGoroutine()] = true
}
// Count returns the number of goroutines in the group.
func (g *synctestGroup) Count() int {
gs := stacks(true)
count := 0
for _, gr := range gs {
if !g.gids[gr.id] && !g.gids[gr.parent] {
continue
}
count++
}
return count
}
// Close calls t.Fatal if the group contains any running goroutines.
func (g *synctestGroup) Close(t testing.TB) {
if count := g.Count(); count != 1 {
buf := make([]byte, 16*1024)
n := runtime.Stack(buf, true)
t.Logf("stacks:\n%s", buf[:n])
t.Fatalf("%v goroutines still running after test completed, expect 1", count)
}
}
// Wait blocks until every goroutine in the group and their direct children are idle.
func (g *synctestGroup) Wait() {
for i := 0; ; i++ {
if g.idle() {
return
}
runtime.Gosched()
if runtime.GOOS == "js" {
// When GOOS=js, we appear to need to time.Sleep to make progress
// on some syscalls. In particular, without this sleep
// writing to stdout (including via t.Log) can block forever.
for range 10 {
time.Sleep(1)
}
}
}
}
func (g *synctestGroup) idle() bool {
gs := stacks(true)
g.mu.Lock()
defer g.mu.Unlock()
for _, gr := range gs[1:] {
if !g.gids[gr.id] && !g.gids[gr.parent] {
continue
}
if gr.syscall {
return false
}
// From runtime/runtime2.go.
switch gr.state {
case "IO wait":
case "chan receive (nil chan)":
case "chan send (nil chan)":
case "select":
case "select (no cases)":
case "chan receive":
case "chan send":
case "sync.Cond.Wait":
default:
return false
}
}
return true
}
func currentGoroutine() int {
s := stacks(false)
return s[0].id
}
func stacks(all bool) []goroutine {
buf := make([]byte, 16*1024)
for {
n := runtime.Stack(buf, all)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, len(buf)*2)
}
var goroutines []goroutine
for _, gs := range strings.Split(string(buf), "\n\n") {
skip, rest, ok := strings.Cut(gs, "goroutine ")
if skip != "" || !ok {
panic(fmt.Errorf("1 unparsable goroutine stack:\n%s", gs))
}
ids, rest, ok := strings.Cut(rest, " [")
if !ok {
panic(fmt.Errorf("2 unparsable goroutine stack:\n%s", gs))
}
id, err := strconv.Atoi(ids)
if err != nil {
panic(fmt.Errorf("3 unparsable goroutine stack:\n%s", gs))
}
state, rest, ok := strings.Cut(rest, "]")
isSyscall := false
if strings.Contains(rest, "\nsyscall.") {
isSyscall = true
}
var parent int
_, rest, ok = strings.Cut(rest, "\ncreated by ")
if ok && strings.Contains(rest, " in goroutine ") {
_, rest, ok := strings.Cut(rest, " in goroutine ")
if !ok {
panic(fmt.Errorf("4 unparsable goroutine stack:\n%s", gs))
}
parents, rest, ok := strings.Cut(rest, "\n")
if !ok {
panic(fmt.Errorf("5 unparsable goroutine stack:\n%s", gs))
}
parent, err = strconv.Atoi(parents)
if err != nil {
panic(fmt.Errorf("6 unparsable goroutine stack:\n%s", gs))
}
}
goroutines = append(goroutines, goroutine{
id: id,
parent: parent,
state: state,
syscall: isSyscall,
})
}
return goroutines
}
// AdvanceTime advances the synthetic clock by d.
func (g *synctestGroup) AdvanceTime(d time.Duration) {
defer g.Wait()
g.mu.Lock()
defer g.mu.Unlock()
g.now = g.now.Add(d)
for tm := range g.timers {
if tm.when.After(g.now) {
continue
}
tm.run()
delete(g.timers, tm)
}
}
// Now returns the current synthetic time.
func (g *synctestGroup) Now() time.Time {
g.mu.Lock()
defer g.mu.Unlock()
return g.now
}
// TimeUntilEvent returns the amount of time until the next scheduled timer.
func (g *synctestGroup) TimeUntilEvent() (d time.Duration, scheduled bool) {
g.mu.Lock()
defer g.mu.Unlock()
for tm := range g.timers {
if dd := tm.when.Sub(g.now); !scheduled || dd < d {
d = dd
scheduled = true
}
}
return d, scheduled
}
// Sleep is time.Sleep, but using synthetic time.
func (g *synctestGroup) Sleep(d time.Duration) {
tm := g.NewTimer(d)
<-tm.C()
}
// NewTimer is time.NewTimer, but using synthetic time.
func (g *synctestGroup) NewTimer(d time.Duration) Timer {
return g.addTimer(d, &fakeTimer{
ch: make(chan time.Time),
})
}
// AfterFunc is time.AfterFunc, but using synthetic time.
func (g *synctestGroup) AfterFunc(d time.Duration, f func()) Timer {
return g.addTimer(d, &fakeTimer{
f: f,
})
}
// ContextWithTimeout is context.WithTimeout, but using synthetic time.
func (g *synctestGroup) ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
tm := g.AfterFunc(d, cancel)
return ctx, func() {
tm.Stop()
cancel()
}
}
func (g *synctestGroup) addTimer(d time.Duration, tm *fakeTimer) *fakeTimer {
g.mu.Lock()
defer g.mu.Unlock()
tm.g = g
tm.when = g.now.Add(d)
if g.timers == nil {
g.timers = make(map[*fakeTimer]struct{})
}
if tm.when.After(g.now) {
g.timers[tm] = struct{}{}
} else {
tm.run()
}
return tm
}
type Timer = interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
type fakeTimer struct {
g *synctestGroup
when time.Time
ch chan time.Time
f func()
}
func (tm *fakeTimer) run() {
if tm.ch != nil {
tm.ch <- tm.g.now
} else {
go func() {
tm.g.Join()
tm.f()
}()
}
}
func (tm *fakeTimer) C() <-chan time.Time { return tm.ch }
func (tm *fakeTimer) Reset(d time.Duration) bool {
tm.g.mu.Lock()
defer tm.g.mu.Unlock()
_, stopped := tm.g.timers[tm]
if d <= 0 {
delete(tm.g.timers, tm)
tm.run()
} else {
tm.when = tm.g.now.Add(d)
tm.g.timers[tm] = struct{}{}
}
return stopped
}
func (tm *fakeTimer) Stop() bool {
tm.g.mu.Lock()
defer tm.g.mu.Unlock()
_, stopped := tm.g.timers[tm]
delete(tm.g.timers, tm)
return stopped
}
// TestSynctestLogs verifies that t.Log works,
// in particular that the GOOS=js workaround in synctestGroup.Wait is working.
// (When GOOS=js, writing to stdout can hang indefinitely if some goroutine loops
// calling runtime.Gosched; see Wait for the workaround.)
func TestSynctestLogs(t *testing.T) {
g := newSynctest(time.Now())
donec := make(chan struct{})
go func() {
g.Join()
for range 100 {
t.Logf("logging a long line")
}
close(donec)
}()
g.Wait()
select {
case <-donec:
default:
panic("done")
}
}

View File

@@ -0,0 +1,42 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.25 && goexperiment.synctest
package http2
import (
"slices"
"testing"
"testing/synctest"
)
// synctestTest emulates the Go 1.25 synctest.Test function on Go 1.24.
func synctestTest(t *testing.T, f func(t testing.TB)) {
t.Helper()
synctest.Run(func() {
t.Helper()
ct := &cleanupT{T: t}
defer ct.done()
f(ct)
})
}
// cleanupT wraps a testing.T and adds its own Cleanup method.
// Used to execute cleanup functions within a synctest bubble.
type cleanupT struct {
*testing.T
cleanups []func()
}
// Cleanup replaces T.Cleanup.
func (t *cleanupT) Cleanup(f func()) {
t.cleanups = append(t.cleanups, f)
}
func (t *cleanupT) done() {
for _, f := range slices.Backward(t.cleanups) {
f()
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25
package http2
import (
"testing"
"testing/synctest"
)
func synctestTest(t *testing.T, f func(t testing.TB)) {
t.Helper()
synctest.Test(t, func(t *testing.T) {
t.Helper()
f(t)
})
}

View File

@@ -1,20 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "time"
// A timer is a time.Timer, as an interface which can be replaced in tests.
type timer = interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// timeTimer adapts a time.Timer to the timer interface.
type timeTimer struct {
*time.Timer
}
func (t timeTimer) C() <-chan time.Time { return t.Timer.C }

View File

@@ -193,50 +193,6 @@ type Transport struct {
type transportTestHooks struct {
newclientconn func(*ClientConn)
group synctestGroupInterface
}
func (t *Transport) markNewGoroutine() {
if t != nil && t.transportTestHooks != nil {
t.transportTestHooks.group.Join()
}
}
func (t *Transport) now() time.Time {
if t != nil && t.transportTestHooks != nil {
return t.transportTestHooks.group.Now()
}
return time.Now()
}
func (t *Transport) timeSince(when time.Time) time.Duration {
if t != nil && t.transportTestHooks != nil {
return t.now().Sub(when)
}
return time.Since(when)
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (t *Transport) afterFunc(d time.Duration, f func()) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
}
func (t *Transport) maxHeaderListSize() uint32 {
@@ -366,7 +322,7 @@ type ClientConn struct {
readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer timer
idleTimer *time.Timer
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
@@ -534,14 +490,12 @@ func (cs *clientStream) closeReqBodyLocked() {
cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed
go func() {
cs.cc.t.markNewGoroutine()
cs.reqBody.Close()
close(reqBodyClosed)
}()
}
type stickyErrWriter struct {
group synctestGroupInterface
conn net.Conn
timeout time.Duration
err *error
@@ -551,7 +505,7 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
*sew.err = err
return n, err
}
@@ -650,9 +604,9 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff)
tm := t.newTimer(d)
tm := time.NewTimer(d)
select {
case <-tm.C():
case <-tm.C:
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
continue
case <-req.Context().Done():
@@ -838,14 +792,11 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
lastActive: t.now(),
lastActive: time.Now(),
}
var group synctestGroupInterface
if t.transportTestHooks != nil {
t.markNewGoroutine()
t.transportTestHooks.newclientconn(cc)
c = cc.tconn
group = t.group
}
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -857,7 +808,6 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{
group: group,
conn: c,
timeout: conf.WriteByteTimeout,
err: &cc.werr,
@@ -906,7 +856,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// Start the idle timer after the connection is fully initialized.
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
}
go cc.readLoop()
@@ -917,7 +867,7 @@ func (cc *ClientConn) healthCheck() {
pingTimeout := cc.pingTimeout
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
defer cancel()
cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx)
@@ -1120,7 +1070,7 @@ func (cc *ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -1186,7 +1136,6 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
done := make(chan struct{})
cancelled := false // guarded by cc.mu
go func() {
cc.t.markNewGoroutine()
cc.mu.Lock()
defer cc.mu.Unlock()
for {
@@ -1427,7 +1376,6 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
cs.cc.t.markNewGoroutine()
err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err)
}
@@ -1558,9 +1506,9 @@ func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStre
var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 {
timer := cc.t.newTimer(d)
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C()
respHeaderTimer = timer.C
respHeaderRecv = cs.respHeaderRecv
}
// Wait until the peer half-closes its end of the stream,
@@ -1753,7 +1701,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
// Return a fatal error which aborts the retry loop.
return errClientConnNotEstablished
}
cc.lastActive = cc.t.now()
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
@@ -2092,10 +2040,10 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
cc.lastActive = cc.t.now()
cc.lastActive = time.Now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = cc.t.now()
cc.lastIdle = time.Now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
@@ -2121,7 +2069,6 @@ type clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() {
cc.t.markNewGoroutine()
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
@@ -2188,9 +2135,9 @@ func (rl *clientConnReadLoop) cleanup() {
if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
unusedWaitTime = cc.idleTimeout
}
idleTime := cc.t.now().Sub(cc.lastActive)
idleTime := time.Now().Sub(cc.lastActive)
if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
cc.t.connPool().MarkDead(cc)
})
} else {
@@ -2250,9 +2197,9 @@ func (rl *clientConnReadLoop) run() error {
cc := rl.cc
gotSettings := false
readIdleTimeout := cc.readIdleTimeout
var t timer
var t *time.Timer
if readIdleTimeout != 0 {
t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
}
for {
f, err := cc.fr.ReadFrame()
@@ -2998,7 +2945,6 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
var pingError error
errc := make(chan struct{})
go func() {
cc.t.markNewGoroutine()
cc.wmu.Lock()
defer cc.wmu.Unlock()
if pingError = cc.fr.WritePing(false, p); pingError != nil {
@@ -3228,7 +3174,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = cc.t.timeSince(cc.lastActive)
ci.IdleTime = time.Since(cc.lastActive)
}
cc.mu.Unlock()

View File

@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.25 || goexperiment.synctest
package http2
import (
@@ -33,6 +35,7 @@ import (
"sync"
"sync/atomic"
"testing"
"testing/synctest"
"time"
"golang.org/x/net/http2/hpack"
@@ -121,7 +124,7 @@ func TestIdleConnTimeout(t *testing.T) {
},
wantNewConn: false,
}} {
t.Run(test.name, func(t *testing.T) {
synctestSubtest(t, test.name, func(t testing.TB) {
tt := newTestTransport(t, func(tr *Transport) {
tr.IdleConnTimeout = test.idleConnTimeout
})
@@ -166,7 +169,7 @@ func TestIdleConnTimeout(t *testing.T) {
tc.wantFrameType(FrameSettings) // ACK to our settings
}
tt.advance(test.wait)
time.Sleep(test.wait)
if got, want := tc.isClosed(), test.wantNewConn; got != want {
t.Fatalf("after waiting %v, conn closed=%v; want %v", test.wait, got, want)
}
@@ -849,10 +852,18 @@ func newLocalListener(t *testing.T) net.Listener {
return ln
}
func TestTransportReqBodyAfterResponse_200(t *testing.T) { testTransportReqBodyAfterResponse(t, 200) }
func TestTransportReqBodyAfterResponse_403(t *testing.T) { testTransportReqBodyAfterResponse(t, 403) }
func TestTransportReqBodyAfterResponse_200(t *testing.T) {
synctestTest(t, func(t testing.TB) {
testTransportReqBodyAfterResponse(t, 200)
})
}
func TestTransportReqBodyAfterResponse_403(t *testing.T) {
synctestTest(t, func(t testing.TB) {
testTransportReqBodyAfterResponse(t, 403)
})
}
func testTransportReqBodyAfterResponse(t *testing.T, status int) {
func testTransportReqBodyAfterResponse(t testing.TB, status int) {
const bodySize = 1 << 10
tc := newTestClientConn(t)
@@ -1083,6 +1094,11 @@ func TestTransportResPattern_c2h2d1t1(t *testing.T) { testTransportResPattern(t,
func TestTransportResPattern_c2h2d1t2(t *testing.T) { testTransportResPattern(t, f2, f2, d1, f2) }
func testTransportResPattern(t *testing.T, expect100Continue, resHeader headerType, withData bool, trailers headerType) {
synctestTest(t, func(t testing.TB) {
testTransportResPatternBubble(t, expect100Continue, resHeader, withData, trailers)
})
}
func testTransportResPatternBubble(t testing.TB, expect100Continue, resHeader headerType, withData bool, trailers headerType) {
const reqBody = "some request body"
const resBody = "some response body"
@@ -1163,7 +1179,8 @@ func testTransportResPattern(t *testing.T, expect100Continue, resHeader headerTy
}
// Issue 26189, Issue 17739: ignore unknown 1xx responses
func TestTransportUnknown1xx(t *testing.T) {
func TestTransportUnknown1xx(t *testing.T) { synctestTest(t, testTransportUnknown1xx) }
func testTransportUnknown1xx(t testing.TB) {
var buf bytes.Buffer
defer func() { got1xxFuncForTests = nil }()
got1xxFuncForTests = func(code int, header textproto.MIMEHeader) error {
@@ -1213,6 +1230,9 @@ code=114 header=map[Foo-Bar:[114]]
}
func TestTransportReceiveUndeclaredTrailer(t *testing.T) {
synctestTest(t, testTransportReceiveUndeclaredTrailer)
}
func testTransportReceiveUndeclaredTrailer(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -1280,6 +1300,11 @@ func TestTransportInvalidTrailer_BinaryFieldValue(t *testing.T) {
}
func testInvalidTrailer(t *testing.T, mode headerType, wantErr error, trailers ...string) {
synctestTest(t, func(t testing.TB) {
testInvalidTrailerBubble(t, mode, wantErr, trailers...)
})
}
func testInvalidTrailerBubble(t testing.TB, mode headerType, wantErr error, trailers ...string) {
tc := newTestClientConn(t)
tc.greet()
@@ -1588,6 +1613,9 @@ func TestTransportChecksRequestHeaderListSize(t *testing.T) {
}
func TestTransportChecksResponseHeaderListSize(t *testing.T) {
synctestTest(t, testTransportChecksResponseHeaderListSize)
}
func testTransportChecksResponseHeaderListSize(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -1633,7 +1661,8 @@ func TestTransportChecksResponseHeaderListSize(t *testing.T) {
}
}
func TestTransportCookieHeaderSplit(t *testing.T) {
func TestTransportCookieHeaderSplit(t *testing.T) { synctestTest(t, testTransportCookieHeaderSplit) }
func testTransportCookieHeaderSplit(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -1862,13 +1891,17 @@ func isTimeout(err error) bool {
// Test that the http1 Transport.ResponseHeaderTimeout option and cancel is sent.
func TestTransportResponseHeaderTimeout_NoBody(t *testing.T) {
testTransportResponseHeaderTimeout(t, false)
synctestTest(t, func(t testing.TB) {
testTransportResponseHeaderTimeout(t, false)
})
}
func TestTransportResponseHeaderTimeout_Body(t *testing.T) {
testTransportResponseHeaderTimeout(t, true)
synctestTest(t, func(t testing.TB) {
testTransportResponseHeaderTimeout(t, true)
})
}
func testTransportResponseHeaderTimeout(t *testing.T, body bool) {
func testTransportResponseHeaderTimeout(t testing.TB, body bool) {
const bodySize = 4 << 20
tc := newTestClientConn(t, func(tr *Transport) {
tr.t1 = &http.Transport{
@@ -1904,11 +1937,11 @@ func testTransportResponseHeaderTimeout(t *testing.T, body bool) {
})
}
tc.advance(4 * time.Millisecond)
time.Sleep(4 * time.Millisecond)
if rt.done() {
t.Fatalf("RoundTrip is done after 4ms; want still waiting")
}
tc.advance(1 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
if err := rt.err(); !isTimeout(err) {
t.Fatalf("RoundTrip error: %v; want timeout error", err)
@@ -2304,7 +2337,8 @@ func TestTransportNewTLSConfig(t *testing.T) {
// The Google GFE responds to HEAD requests with a HEADERS frame
// without END_STREAM, followed by a 0-length DATA frame with
// END_STREAM. Make sure we don't get confused by that. (We did.)
func TestTransportReadHeadResponse(t *testing.T) {
func TestTransportReadHeadResponse(t *testing.T) { synctestTest(t, testTransportReadHeadResponse) }
func testTransportReadHeadResponse(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -2331,6 +2365,9 @@ func TestTransportReadHeadResponse(t *testing.T) {
}
func TestTransportReadHeadResponseWithBody(t *testing.T) {
synctestTest(t, testTransportReadHeadResponseWithBody)
}
func testTransportReadHeadResponseWithBody(t testing.TB) {
// This test uses an invalid response format.
// Discard logger output to not spam tests output.
log.SetOutput(io.Discard)
@@ -2475,14 +2512,18 @@ func TestTransportFlowControl(t *testing.T) {
// proceeds to close the TCP connection before the client gets its
// response)
func TestTransportUsesGoAwayDebugError_RoundTrip(t *testing.T) {
testTransportUsesGoAwayDebugError(t, false)
synctestTest(t, func(t testing.TB) {
testTransportUsesGoAwayDebugError(t, false)
})
}
func TestTransportUsesGoAwayDebugError_Body(t *testing.T) {
testTransportUsesGoAwayDebugError(t, true)
synctestTest(t, func(t testing.TB) {
testTransportUsesGoAwayDebugError(t, true)
})
}
func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) {
func testTransportUsesGoAwayDebugError(t testing.TB, failMidBody bool) {
tc := newTestClientConn(t)
tc.greet()
@@ -2532,7 +2573,7 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) {
}
}
func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
func testTransportReturnsUnusedFlowControl(t testing.TB, oneDataFrame bool) {
tc := newTestClientConn(t)
tc.greet()
@@ -2573,7 +2614,7 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
t.Fatalf("body read = %v, %v; want 1, nil", n, err)
}
res.Body.Close() // leaving 4999 bytes unread
tc.sync()
synctest.Wait()
sentAdditionalData := false
tc.wantUnorderedFrames(
@@ -2609,17 +2650,22 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
// See golang.org/issue/16481
func TestTransportReturnsUnusedFlowControlSingleWrite(t *testing.T) {
testTransportReturnsUnusedFlowControl(t, true)
synctestTest(t, func(t testing.TB) {
testTransportReturnsUnusedFlowControl(t, true)
})
}
// See golang.org/issue/20469
func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) {
testTransportReturnsUnusedFlowControl(t, false)
synctestTest(t, func(t testing.TB) {
testTransportReturnsUnusedFlowControl(t, false)
})
}
// Issue 16612: adjust flow control on open streams when transport
// receives SETTINGS with INITIAL_WINDOW_SIZE from server.
func TestTransportAdjustsFlowControl(t *testing.T) {
func TestTransportAdjustsFlowControl(t *testing.T) { synctestTest(t, testTransportAdjustsFlowControl) }
func testTransportAdjustsFlowControl(t testing.TB) {
const bodySize = 1 << 20
tc := newTestClientConn(t)
@@ -2676,6 +2722,9 @@ func TestTransportAdjustsFlowControl(t *testing.T) {
// See golang.org/issue/16556
func TestTransportReturnsDataPaddingFlowControl(t *testing.T) {
synctestTest(t, testTransportReturnsDataPaddingFlowControl)
}
func testTransportReturnsDataPaddingFlowControl(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -2711,6 +2760,9 @@ func TestTransportReturnsDataPaddingFlowControl(t *testing.T) {
// golang.org/issue/16572 -- RoundTrip shouldn't hang when it gets a
// StreamError as a result of the response HEADERS
func TestTransportReturnsErrorOnBadResponseHeaders(t *testing.T) {
synctestTest(t, testTransportReturnsErrorOnBadResponseHeaders)
}
func testTransportReturnsErrorOnBadResponseHeaders(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -2762,6 +2814,9 @@ func (b byteAndEOFReader) Read(p []byte) (n int, err error) {
// which returns (non-0, io.EOF) and also needs to set the ContentLength
// explicitly.
func TestTransportBodyDoubleEndStream(t *testing.T) {
synctestTest(t, testTransportBodyDoubleEndStream)
}
func testTransportBodyDoubleEndStream(t testing.TB) {
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
// Nothing.
})
@@ -3031,7 +3086,8 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
req.Header = http.Header{}
}
func TestTransportCloseAfterLostPing(t *testing.T) {
func TestTransportCloseAfterLostPing(t *testing.T) { synctestTest(t, testTransportCloseAfterLostPing) }
func testTransportCloseAfterLostPing(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.PingTimeout = 1 * time.Second
tr.ReadIdleTimeout = 1 * time.Second
@@ -3042,10 +3098,10 @@ func TestTransportCloseAfterLostPing(t *testing.T) {
rt := tc.roundTrip(req)
tc.wantFrameType(FrameHeaders)
tc.advance(1 * time.Second)
time.Sleep(1 * time.Second)
tc.wantFrameType(FramePing)
tc.advance(1 * time.Second)
time.Sleep(1 * time.Second)
err := rt.err()
if err == nil || !strings.Contains(err.Error(), "client connection lost") {
t.Fatalf("expected to get error about \"connection lost\", got %v", err)
@@ -3081,6 +3137,9 @@ func TestTransportPingWriteBlocks(t *testing.T) {
}
func TestTransportPingWhenReadingMultiplePings(t *testing.T) {
synctestTest(t, testTransportPingWhenReadingMultiplePings)
}
func testTransportPingWhenReadingMultiplePings(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.ReadIdleTimeout = 1000 * time.Millisecond
})
@@ -3102,20 +3161,20 @@ func TestTransportPingWhenReadingMultiplePings(t *testing.T) {
for i := 0; i < 5; i++ {
// No ping yet...
tc.advance(999 * time.Millisecond)
time.Sleep(999 * time.Millisecond)
if f := tc.readFrame(); f != nil {
t.Fatalf("unexpected frame: %v", f)
}
// ...ping now.
tc.advance(1 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
f := readFrame[*PingFrame](t, tc)
tc.writePing(true, f.Data)
}
// Cancel the request, Transport resets it and returns an error from body reads.
cancel()
tc.sync()
synctest.Wait()
tc.wantFrameType(FrameRSTStream)
_, err := rt.readBody()
@@ -3125,6 +3184,9 @@ func TestTransportPingWhenReadingMultiplePings(t *testing.T) {
}
func TestTransportPingWhenReadingPingDisabled(t *testing.T) {
synctestTest(t, testTransportPingWhenReadingPingDisabled)
}
func testTransportPingWhenReadingPingDisabled(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.ReadIdleTimeout = 0 // PINGs disabled
})
@@ -3144,13 +3206,16 @@ func TestTransportPingWhenReadingPingDisabled(t *testing.T) {
})
// No PING is sent, even after a long delay.
tc.advance(1 * time.Minute)
time.Sleep(1 * time.Minute)
if f := tc.readFrame(); f != nil {
t.Fatalf("unexpected frame: %v", f)
}
}
func TestTransportRetryAfterGOAWAYNoRetry(t *testing.T) {
synctestTest(t, testTransportRetryAfterGOAWAYNoRetry)
}
func testTransportRetryAfterGOAWAYNoRetry(t testing.TB) {
tt := newTestTransport(t)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
@@ -3175,6 +3240,9 @@ func TestTransportRetryAfterGOAWAYNoRetry(t *testing.T) {
}
func TestTransportRetryAfterGOAWAYRetry(t *testing.T) {
synctestTest(t, testTransportRetryAfterGOAWAYRetry)
}
func testTransportRetryAfterGOAWAYRetry(t testing.TB) {
tt := newTestTransport(t)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
@@ -3219,6 +3287,9 @@ func TestTransportRetryAfterGOAWAYRetry(t *testing.T) {
}
func TestTransportRetryAfterGOAWAYSecondRequest(t *testing.T) {
synctestTest(t, testTransportRetryAfterGOAWAYSecondRequest)
}
func testTransportRetryAfterGOAWAYSecondRequest(t testing.TB) {
tt := newTestTransport(t)
// First request succeeds.
@@ -3282,6 +3353,9 @@ func TestTransportRetryAfterGOAWAYSecondRequest(t *testing.T) {
}
func TestTransportRetryAfterRefusedStream(t *testing.T) {
synctestTest(t, testTransportRetryAfterRefusedStream)
}
func testTransportRetryAfterRefusedStream(t testing.TB) {
tt := newTestTransport(t)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
@@ -3320,20 +3394,21 @@ func TestTransportRetryAfterRefusedStream(t *testing.T) {
rt.wantStatus(204)
}
func TestTransportRetryHasLimit(t *testing.T) {
func TestTransportRetryHasLimit(t *testing.T) { synctestTest(t, testTransportRetryHasLimit) }
func testTransportRetryHasLimit(t testing.TB) {
tt := newTestTransport(t)
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
rt := tt.roundTrip(req)
// First attempt: Server sends a GOAWAY.
tc := tt.getConn()
tc.netconn.SetReadDeadline(time.Time{})
tc.wantFrameType(FrameSettings)
tc.wantFrameType(FrameWindowUpdate)
var totalDelay time.Duration
count := 0
for streamID := uint32(1); ; streamID += 2 {
start := time.Now()
for streamID := uint32(1); !rt.done(); streamID += 2 {
count++
tc.wantHeaders(wantHeader{
streamID: streamID,
@@ -3345,18 +3420,9 @@ func TestTransportRetryHasLimit(t *testing.T) {
}
tc.writeRSTStream(streamID, ErrCodeRefusedStream)
d, scheduled := tt.group.TimeUntilEvent()
if !scheduled {
if streamID == 1 {
continue
}
break
}
totalDelay += d
if totalDelay > 5*time.Minute {
if totalDelay := time.Since(start); totalDelay > 5*time.Minute {
t.Fatalf("RoundTrip still retrying after %v, should have given up", totalDelay)
}
tt.advance(d)
}
if got, want := count, 5; got < count {
t.Errorf("RoundTrip made %v attempts, want at least %v", got, want)
@@ -3367,6 +3433,9 @@ func TestTransportRetryHasLimit(t *testing.T) {
}
func TestTransportResponseDataBeforeHeaders(t *testing.T) {
synctestTest(t, testTransportResponseDataBeforeHeaders)
}
func testTransportResponseDataBeforeHeaders(t testing.TB) {
// Discard log output complaining about protocol error.
log.SetOutput(io.Discard)
t.Cleanup(func() { log.SetOutput(os.Stderr) }) // after other cleanup is done
@@ -3408,7 +3477,7 @@ func TestTransportMaxFrameReadSize(t *testing.T) {
maxReadFrameSize: 1024,
want: minMaxFrameSize,
}} {
t.Run(fmt.Sprint(test.maxReadFrameSize), func(t *testing.T) {
synctestSubtest(t, fmt.Sprint(test.maxReadFrameSize), func(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.MaxReadFrameSize = test.maxReadFrameSize
})
@@ -3470,6 +3539,9 @@ func TestTransportRequestsLowServerLimit(t *testing.T) {
// tests Transport.StrictMaxConcurrentStreams
func TestTransportRequestsStallAtServerLimit(t *testing.T) {
synctestTest(t, testTransportRequestsStallAtServerLimit)
}
func testTransportRequestsStallAtServerLimit(t testing.TB) {
const maxConcurrent = 2
tc := newTestClientConn(t, func(tr *Transport) {
@@ -3517,7 +3589,7 @@ func TestTransportRequestsStallAtServerLimit(t *testing.T) {
// Cancel the maxConcurrent'th request.
// The request should fail.
close(cancelClientRequest)
tc.sync()
synctest.Wait()
if err := rts[maxConcurrent].err(); err == nil {
t.Fatalf("RoundTrip(%d) should have failed due to cancel, did not", maxConcurrent)
}
@@ -3551,6 +3623,9 @@ func TestTransportRequestsStallAtServerLimit(t *testing.T) {
}
func TestTransportMaxDecoderHeaderTableSize(t *testing.T) {
synctestTest(t, testTransportMaxDecoderHeaderTableSize)
}
func testTransportMaxDecoderHeaderTableSize(t testing.TB) {
var reqSize, resSize uint32 = 8192, 16384
tc := newTestClientConn(t, func(tr *Transport) {
tr.MaxDecoderHeaderTableSize = reqSize
@@ -3572,6 +3647,9 @@ func TestTransportMaxDecoderHeaderTableSize(t *testing.T) {
}
func TestTransportMaxEncoderHeaderTableSize(t *testing.T) {
synctestTest(t, testTransportMaxEncoderHeaderTableSize)
}
func testTransportMaxEncoderHeaderTableSize(t testing.TB) {
var peerAdvertisedMaxHeaderTableSize uint32 = 16384
tc := newTestClientConn(t, func(tr *Transport) {
tr.MaxEncoderHeaderTableSize = 8192
@@ -3662,7 +3740,8 @@ func TestTransportAllocationsAfterResponseBodyClose(t *testing.T) {
// Issue 18891: make sure Request.Body == NoBody means no DATA frame
// is ever sent, even if empty.
func TestTransportNoBodyMeansNoDATA(t *testing.T) {
func TestTransportNoBodyMeansNoDATA(t *testing.T) { synctestTest(t, testTransportNoBodyMeansNoDATA) }
func testTransportNoBodyMeansNoDATA(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -3756,6 +3835,9 @@ func TestTransportResponseAndResetWithoutConsumingBodyRace(t *testing.T) {
// Verify transport doesn't crash when receiving bogus response lacking a :status header.
// Issue 22880.
func TestTransportHandlesInvalidStatuslessResponse(t *testing.T) {
synctestTest(t, testTransportHandlesInvalidStatuslessResponse)
}
func testTransportHandlesInvalidStatuslessResponse(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -4117,6 +4199,11 @@ func (r *errReader) Read(p []byte) (int, error) {
}
func testTransportBodyReadError(t *testing.T, body []byte) {
synctestTest(t, func(t testing.TB) {
testTransportBodyReadErrorBubble(t, body)
})
}
func testTransportBodyReadErrorBubble(t testing.TB, body []byte) {
tc := newTestClientConn(t)
tc.greet()
@@ -4161,7 +4248,8 @@ func TestTransportBodyReadError_Some(t *testing.T) { testTransportBodyRea
// Issue 32254: verify that the client sends END_STREAM flag eagerly with the last
// (or in this test-case the only one) request body data frame, and does not send
// extra zero-len data frames.
func TestTransportBodyEagerEndStream(t *testing.T) {
func TestTransportBodyEagerEndStream(t *testing.T) { synctestTest(t, testTransportBodyEagerEndStream) }
func testTransportBodyEagerEndStream(t testing.TB) {
const reqBody = "some request body"
const resBody = "some response body"
@@ -4205,17 +4293,21 @@ func TestTransportBodyLargerThanSpecifiedContentLength_len3(t *testing.T) {
[]byte("123"),
[]byte("456"),
}}
testTransportBodyLargerThanSpecifiedContentLength(t, body, 3)
synctestTest(t, func(t testing.TB) {
testTransportBodyLargerThanSpecifiedContentLength(t, body, 3)
})
}
func TestTransportBodyLargerThanSpecifiedContentLength_len2(t *testing.T) {
body := &chunkReader{[][]byte{
[]byte("123"),
}}
testTransportBodyLargerThanSpecifiedContentLength(t, body, 2)
synctestTest(t, func(t testing.TB) {
testTransportBodyLargerThanSpecifiedContentLength(t, body, 2)
})
}
func testTransportBodyLargerThanSpecifiedContentLength(t *testing.T, body *chunkReader, contentLen int64) {
func testTransportBodyLargerThanSpecifiedContentLength(t testing.TB, body *chunkReader, contentLen int64) {
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
r.Body.Read(make([]byte, 6))
})
@@ -4818,6 +4910,9 @@ func TestTransportCloseRequestBody(t *testing.T) {
}
func TestTransportRetriesOnStreamProtocolError(t *testing.T) {
synctestTest(t, testTransportRetriesOnStreamProtocolError)
}
func testTransportRetriesOnStreamProtocolError(t testing.TB) {
// This test verifies that
// - receiving a protocol error on a connection does not interfere with
// other requests in flight on that connection;
@@ -4893,7 +4988,8 @@ func TestTransportRetriesOnStreamProtocolError(t *testing.T) {
rt1.wantStatus(200)
}
func TestClientConnReservations(t *testing.T) {
func TestClientConnReservations(t *testing.T) { synctestTest(t, testClientConnReservations) }
func testClientConnReservations(t testing.TB) {
tc := newTestClientConn(t)
tc.greet(
Setting{ID: SettingMaxConcurrentStreams, Val: initialMaxConcurrentStreams},
@@ -4944,7 +5040,8 @@ func TestClientConnReservations(t *testing.T) {
}
}
func TestTransportTimeoutServerHangs(t *testing.T) {
func TestTransportTimeoutServerHangs(t *testing.T) { synctestTest(t, testTransportTimeoutServerHangs) }
func testTransportTimeoutServerHangs(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -4953,7 +5050,7 @@ func TestTransportTimeoutServerHangs(t *testing.T) {
rt := tc.roundTrip(req)
tc.wantFrameType(FrameHeaders)
tc.advance(5 * time.Second)
time.Sleep(5 * time.Second)
if f := tc.readFrame(); f != nil {
t.Fatalf("unexpected frame: %v", f)
}
@@ -4962,20 +5059,13 @@ func TestTransportTimeoutServerHangs(t *testing.T) {
}
cancel()
tc.sync()
synctest.Wait()
if rt.err() != context.Canceled {
t.Fatalf("RoundTrip error: %v; want context.Canceled", rt.err())
}
}
func TestTransportContentLengthWithoutBody(t *testing.T) {
contentLength := ""
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", contentLength)
})
tr := &Transport{TLSClientConfig: tlsConfigInsecure}
defer tr.CloseIdleConnections()
for _, test := range []struct {
name string
contentLength string
@@ -4996,7 +5086,14 @@ func TestTransportContentLengthWithoutBody(t *testing.T) {
wantContentLength: 0,
},
} {
t.Run(test.name, func(t *testing.T) {
synctestSubtest(t, test.name, func(t testing.TB) {
contentLength := ""
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", contentLength)
})
tr := &Transport{TLSClientConfig: tlsConfigInsecure}
defer tr.CloseIdleConnections()
contentLength = test.contentLength
req, _ := http.NewRequest("GET", ts.URL, nil)
@@ -5021,6 +5118,9 @@ func TestTransportContentLengthWithoutBody(t *testing.T) {
}
func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
synctestTest(t, testTransportCloseResponseBodyWhileRequestBodyHangs)
}
func testTransportCloseResponseBodyWhileRequestBodyHangs(t testing.TB) {
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.(http.Flusher).Flush()
@@ -5044,7 +5144,8 @@ func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
pw.Close()
}
func TestTransport300ResponseBody(t *testing.T) {
func TestTransport300ResponseBody(t *testing.T) { synctestTest(t, testTransport300ResponseBody) }
func testTransport300ResponseBody(t testing.TB) {
reqc := make(chan struct{})
body := []byte("response body")
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
@@ -5120,7 +5221,8 @@ func (c *slowWriteConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}
func TestTransportSlowWrites(t *testing.T) {
func TestTransportSlowWrites(t *testing.T) { synctestTest(t, testTransportSlowWrites) }
func testTransportSlowWrites(t testing.TB) {
ts := newTestServer(t,
func(w http.ResponseWriter, r *http.Request) {},
)
@@ -5145,10 +5247,14 @@ func TestTransportSlowWrites(t *testing.T) {
}
func TestTransportClosesConnAfterGoAwayNoStreams(t *testing.T) {
testTransportClosesConnAfterGoAway(t, 0)
synctestTest(t, func(t testing.TB) {
testTransportClosesConnAfterGoAway(t, 0)
})
}
func TestTransportClosesConnAfterGoAwayLastStream(t *testing.T) {
testTransportClosesConnAfterGoAway(t, 1)
synctestTest(t, func(t testing.TB) {
testTransportClosesConnAfterGoAway(t, 1)
})
}
// testTransportClosesConnAfterGoAway verifies that the transport
@@ -5157,7 +5263,7 @@ func TestTransportClosesConnAfterGoAwayLastStream(t *testing.T) {
// lastStream is the last stream ID in the GOAWAY frame.
// When 0, the transport (unsuccessfully) retries the request (stream 1);
// when 1, the transport reads the response after receiving the GOAWAY.
func testTransportClosesConnAfterGoAway(t *testing.T, lastStream uint32) {
func testTransportClosesConnAfterGoAway(t testing.TB, lastStream uint32) {
tc := newTestClientConn(t)
tc.greet()
@@ -5384,7 +5490,8 @@ func TestDialRaceResumesDial(t *testing.T) {
}
}
func TestTransportDataAfter1xxHeader(t *testing.T) {
func TestTransportDataAfter1xxHeader(t *testing.T) { synctestTest(t, testTransportDataAfter1xxHeader) }
func testTransportDataAfter1xxHeader(t testing.TB) {
// Discard logger output to avoid spamming stderr.
log.SetOutput(io.Discard)
defer log.SetOutput(os.Stderr)
@@ -5514,7 +5621,7 @@ func TestTransport1xxLimits(t *testing.T) {
hcount: 20,
limited: false,
}} {
t.Run(test.name, func(t *testing.T) {
synctestSubtest(t, test.name, func(t testing.TB) {
tc := newTestClientConn(t, test.opt)
tc.greet()
@@ -5549,7 +5656,8 @@ func TestTransport1xxLimits(t *testing.T) {
}
}
func TestTransportSendPingWithReset(t *testing.T) {
func TestTransportSendPingWithReset(t *testing.T) { synctestTest(t, testTransportSendPingWithReset) }
func testTransportSendPingWithReset(t testing.TB) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.StrictMaxConcurrentStreams = true
})
@@ -5609,6 +5717,9 @@ func TestTransportSendPingWithReset(t *testing.T) {
// Issue #70505: gRPC gets upset if we send more than 2 pings per HEADERS/DATA frame
// sent by the server.
func TestTransportSendNoMoreThanOnePingWithReset(t *testing.T) {
synctestTest(t, testTransportSendNoMoreThanOnePingWithReset)
}
func testTransportSendNoMoreThanOnePingWithReset(t testing.TB) {
tc := newTestClientConn(t)
tc.greet()
@@ -5674,6 +5785,9 @@ func TestTransportSendNoMoreThanOnePingWithReset(t *testing.T) {
}
func TestTransportConnBecomesUnresponsive(t *testing.T) {
synctestTest(t, testTransportConnBecomesUnresponsive)
}
func testTransportConnBecomesUnresponsive(t testing.TB) {
// We send a number of requests in series to an unresponsive connection.
// Each request is canceled or times out without a response.
// Eventually, we open a new connection rather than trying to use the old one.
@@ -5744,19 +5858,19 @@ func TestTransportConnBecomesUnresponsive(t *testing.T) {
}
// Test that the Transport can use a conn provided to it by a TLSNextProto hook.
func TestTransportTLSNextProtoConnOK(t *testing.T) {
func TestTransportTLSNextProtoConnOK(t *testing.T) { synctestTest(t, testTransportTLSNextProtoConnOK) }
func testTransportTLSNextProtoConnOK(t testing.TB) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cli, _ := synctestNetPipe()
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
synctest.Wait()
tc := tt.getConn()
tc.greet()
@@ -5787,18 +5901,20 @@ func TestTransportTLSNextProtoConnOK(t *testing.T) {
// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error.
func TestTransportTLSNextProtoConnImmediateFailureUsed(t *testing.T) {
synctestTest(t, testTransportTLSNextProtoConnImmediateFailureUsed)
}
func testTransportTLSNextProtoConnImmediateFailureUsed(t testing.TB) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cli, _ := synctestNetPipe()
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
synctest.Wait()
tc := tt.getConn()
// The connection encounters an error before we send a request that uses it.
@@ -5825,6 +5941,9 @@ func TestTransportTLSNextProtoConnImmediateFailureUsed(t *testing.T) {
// Test the case where a conn provided via a TLSNextProto hook is closed for idleness
// before we use it.
func TestTransportTLSNextProtoConnIdleTimoutBeforeUse(t *testing.T) {
synctestTest(t, testTransportTLSNextProtoConnIdleTimoutBeforeUse)
}
func testTransportTLSNextProtoConnIdleTimoutBeforeUse(t testing.TB) {
t1 := &http.Transport{
IdleConnTimeout: 1 * time.Second,
}
@@ -5832,17 +5951,17 @@ func TestTransportTLSNextProtoConnIdleTimoutBeforeUse(t *testing.T) {
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cli, _ := synctestNetPipe()
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()
synctest.Wait()
_ = tt.getConn()
// The connection encounters an error before we send a request that uses it.
tc.advance(2 * time.Second)
time.Sleep(2 * time.Second)
synctest.Wait()
// Send a request on the Transport.
//
@@ -5857,18 +5976,20 @@ func TestTransportTLSNextProtoConnIdleTimoutBeforeUse(t *testing.T) {
// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error,
// but no requests are sent which would use the bad connection.
func TestTransportTLSNextProtoConnImmediateFailureUnused(t *testing.T) {
synctestTest(t, testTransportTLSNextProtoConnImmediateFailureUnused)
}
func testTransportTLSNextProtoConnImmediateFailureUnused(t testing.TB) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)
// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cli, _ := synctestNetPipe()
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
synctest.Wait()
tc := tt.getConn()
// The connection encounters an error before we send a request that uses it.
@@ -5876,7 +5997,7 @@ func TestTransportTLSNextProtoConnImmediateFailureUnused(t *testing.T) {
// Some time passes.
// The dead connection is removed from the pool.
tc.advance(10 * time.Second)
time.Sleep(10 * time.Second)
// Send a request on the Transport.
//
@@ -5959,6 +6080,9 @@ func TestExtendedConnectClientWithoutServerSupport(t *testing.T) {
// Issue #70658: Make sure extended CONNECT requests don't get stuck if a
// connection fails early in its lifetime.
func TestExtendedConnectReadFrameError(t *testing.T) {
synctestTest(t, testExtendedConnectReadFrameError)
}
func testExtendedConnectReadFrameError(t testing.TB) {
tc := newTestClientConn(t)
tc.wantFrameType(FrameSettings)
tc.wantFrameType(FrameWindowUpdate)