mirror of
https://github.com/golang/net.git
synced 2026-04-01 02:47:08 +09:00
The x/net go.mod now depends on go1.23. Change-Id: I24960949d84753f84f75fbdc97b7dfb92191a4d2 Reviewed-on: https://go-review.googlesource.com/c/net/+/664295 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Jonathan Amsterdam <jba@google.com> Auto-Submit: Damien Neil <dneil@google.com>
186 lines
5.0 KiB
Go
186 lines
5.0 KiB
Go
// Copyright 2023 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package quic
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
// asyncTestState permits handling asynchronous operations in a synchronous test.
|
|
//
|
|
// For example, a test may want to write to a stream and observe that
|
|
// STREAM frames are sent with the contents of the write in response
|
|
// to MAX_STREAM_DATA frames received from the peer.
|
|
// The Stream.Write is an asynchronous operation, but the test is simpler
|
|
// if we can start the write, observe the first STREAM frame sent,
|
|
// send a MAX_STREAM_DATA frame, observe the next STREAM frame sent, etc.
|
|
//
|
|
// We do this by instrumenting points where operations can block.
|
|
// We start async operations like Write in a goroutine,
|
|
// and wait for the operation to either finish or hit a blocking point.
|
|
// When the connection event loop is idle, we check a list of
|
|
// blocked operations to see if any can be woken.
|
|
type asyncTestState struct {
|
|
mu sync.Mutex
|
|
notify chan struct{}
|
|
blocked map[*blockedAsync]struct{}
|
|
}
|
|
|
|
// An asyncOp is an asynchronous operation that results in (T, error).
|
|
type asyncOp[T any] struct {
|
|
v T
|
|
err error
|
|
|
|
caller string
|
|
tc *testConn
|
|
donec chan struct{}
|
|
cancelFunc context.CancelFunc
|
|
}
|
|
|
|
// cancel cancels the async operation's context, and waits for
|
|
// the operation to complete.
|
|
func (a *asyncOp[T]) cancel() {
|
|
select {
|
|
case <-a.donec:
|
|
return // already done
|
|
default:
|
|
}
|
|
a.cancelFunc()
|
|
<-a.tc.asyncTestState.notify
|
|
select {
|
|
case <-a.donec:
|
|
default:
|
|
panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
|
|
}
|
|
}
|
|
|
|
var errNotDone = errors.New("async op is not done")
|
|
|
|
// result returns the result of the async operation.
|
|
// It returns errNotDone if the operation is still in progress.
|
|
//
|
|
// Note that unlike a traditional async/await, this doesn't block
|
|
// waiting for the operation to complete. Since tests have full
|
|
// control over the progress of operations, an asyncOp can only
|
|
// become done in reaction to the test taking some action.
|
|
func (a *asyncOp[T]) result() (v T, err error) {
|
|
a.tc.wait()
|
|
select {
|
|
case <-a.donec:
|
|
return a.v, a.err
|
|
default:
|
|
return v, errNotDone
|
|
}
|
|
}
|
|
|
|
// A blockedAsync is a blocked async operation.
|
|
type blockedAsync struct {
|
|
until func() bool // when this returns true, the operation is unblocked
|
|
donec chan struct{} // closed when the operation is unblocked
|
|
}
|
|
|
|
type asyncContextKey struct{}
|
|
|
|
// runAsync starts an asynchronous operation.
|
|
//
|
|
// The function f should call a blocking function such as
|
|
// Stream.Write or Conn.AcceptStream and return its result.
|
|
// It must use the provided context.
|
|
func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
|
|
as := &tc.asyncTestState
|
|
if as.notify == nil {
|
|
as.notify = make(chan struct{})
|
|
as.mu.Lock()
|
|
as.blocked = make(map[*blockedAsync]struct{})
|
|
as.mu.Unlock()
|
|
}
|
|
_, file, line, _ := runtime.Caller(1)
|
|
ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
a := &asyncOp[T]{
|
|
tc: tc,
|
|
caller: fmt.Sprintf("%v:%v", filepath.Base(file), line),
|
|
donec: make(chan struct{}),
|
|
cancelFunc: cancel,
|
|
}
|
|
go func() {
|
|
a.v, a.err = f(ctx)
|
|
close(a.donec)
|
|
as.notify <- struct{}{}
|
|
}()
|
|
tc.t.Cleanup(func() {
|
|
if _, err := a.result(); err == errNotDone {
|
|
tc.t.Errorf("%v: async operation is still executing at end of test", a.caller)
|
|
a.cancel()
|
|
}
|
|
})
|
|
// Wait for the operation to either finish or block.
|
|
<-as.notify
|
|
tc.wait()
|
|
return a
|
|
}
|
|
|
|
// waitUntil waits for a blocked async operation to complete.
|
|
// The operation is complete when the until func returns true.
|
|
func (as *asyncTestState) waitUntil(ctx context.Context, until func() bool) error {
|
|
if until() {
|
|
return nil
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
// Context has already expired.
|
|
return err
|
|
}
|
|
if ctx.Value(asyncContextKey{}) == nil {
|
|
// Context is not one that we've created, and hasn't expired.
|
|
// This probably indicates that we've tried to perform a
|
|
// blocking operation without using the async test harness here,
|
|
// which may have unpredictable results.
|
|
panic("blocking async point with unexpected Context")
|
|
}
|
|
b := &blockedAsync{
|
|
until: until,
|
|
donec: make(chan struct{}),
|
|
}
|
|
// Record this as a pending blocking operation.
|
|
as.mu.Lock()
|
|
as.blocked[b] = struct{}{}
|
|
as.mu.Unlock()
|
|
// Notify the creator of the operation that we're blocked,
|
|
// and wait to be woken up.
|
|
as.notify <- struct{}{}
|
|
select {
|
|
case <-b.donec:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// wakeAsync tries to wake up a blocked async operation.
|
|
// It returns true if one was woken, false otherwise.
|
|
func (as *asyncTestState) wakeAsync() bool {
|
|
as.mu.Lock()
|
|
var woken *blockedAsync
|
|
for w := range as.blocked {
|
|
if w.until() {
|
|
woken = w
|
|
delete(as.blocked, w)
|
|
break
|
|
}
|
|
}
|
|
as.mu.Unlock()
|
|
if woken == nil {
|
|
return false
|
|
}
|
|
close(woken.donec)
|
|
<-as.notify // must not hold as.mu while blocked here
|
|
return true
|
|
}
|