mirror of
https://github.com/golang/net.git
synced 2026-04-01 02:47:08 +09:00
netutil: make LimitListener tests more robust
In CL 372495 I cleaned up TestLimitListener so that it would not fail spuriously. However, upon further thought I realized that the original test was actually checking two different properties (steady-state saturation, and actual overload), and the cleaned-up test was only checking one of those (overload). This change adds a separate test for steady-state saturation, and makes the overload test more robust to spurious connections (which could occur, for example, if another test running on the machine accidentally dials this test's open port). The test cleanup also revealed a bad interaction with an existing bug in the js/wasm net.TCPListener implementation (filed as golang/go#50216), for which I have added a workaround in (*limitListener).Accept. For golang/go#22926 Change-Id: I727050a8254f527c7455de296ed3525b6dc90141 Reviewed-on: https://go-review.googlesource.com/c/net/+/372714 Trust: Bryan Mills <bcmills@google.com> Run-TryBot: Bryan Mills <bcmills@google.com> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Ian Lance Taylor <iant@golang.org>
This commit is contained in:
committed by
Bryan Mills
parent
fe4d628211
commit
5b0dc2dfae
@@ -42,14 +42,27 @@ func (l *limitListener) acquire() bool {
|
||||
func (l *limitListener) release() { <-l.sem }
|
||||
|
||||
func (l *limitListener) Accept() (net.Conn, error) {
|
||||
acquired := l.acquire()
|
||||
// If the semaphore isn't acquired because the listener was closed, expect
|
||||
// that this call to accept won't block, but immediately return an error.
|
||||
if !l.acquire() {
|
||||
// If the semaphore isn't acquired because the listener was closed, expect
|
||||
// that this call to accept won't block, but immediately return an error.
|
||||
// If it instead returns a spurious connection (due to a bug in the
|
||||
// Listener, such as https://golang.org/issue/50216), we immediately close
|
||||
// it and try again. Some buggy Listener implementations (like the one in
|
||||
// the aforementioned issue) seem to assume that Accept will be called to
|
||||
// completion, and may otherwise fail to clean up the client end of pending
|
||||
// connections.
|
||||
for {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
if acquired {
|
||||
l.release()
|
||||
}
|
||||
l.release()
|
||||
return nil, err
|
||||
}
|
||||
return &limitListenerConn{Conn: c, release: l.release}, nil
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLimitListener(t *testing.T) {
|
||||
func TestLimitListenerOverload(t *testing.T) {
|
||||
const (
|
||||
max = 5
|
||||
attempts = max * 2
|
||||
@@ -30,6 +30,7 @@ func TestLimitListener(t *testing.T) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
saturated := make(chan struct{})
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
@@ -40,69 +41,174 @@ func TestLimitListener(t *testing.T) {
|
||||
break
|
||||
}
|
||||
accepted++
|
||||
if accepted == max {
|
||||
close(saturated)
|
||||
}
|
||||
io.WriteString(c, msg)
|
||||
|
||||
defer c.Close() // Leave c open until the listener is closed.
|
||||
// Leave c open until the listener is closed.
|
||||
defer c.Close()
|
||||
}
|
||||
if accepted > max {
|
||||
t.Errorf("accepted %d simultaneous connections; want at most %d", accepted, max)
|
||||
t.Logf("with limit %d, accepted %d simultaneous connections", max, accepted)
|
||||
// The listener accounts open connections based on Listener-side Close
|
||||
// calls, so even if the client hangs up early (for example, because it
|
||||
// was a random dial from another process instead of from this test), we
|
||||
// should not end up accepting more connections than expected.
|
||||
if accepted != max {
|
||||
t.Errorf("want exactly %d", max)
|
||||
}
|
||||
}()
|
||||
|
||||
// connc keeps the client end of the dialed connections alive until the
|
||||
// test completes.
|
||||
connc := make(chan []net.Conn, 1)
|
||||
connc <- nil
|
||||
|
||||
dialCtx, cancelDial := context.WithCancel(context.Background())
|
||||
defer cancelDial()
|
||||
dialer := &net.Dialer{}
|
||||
|
||||
var served int32
|
||||
var dialed, served int32
|
||||
var pendingDials sync.WaitGroup
|
||||
for n := attempts; n > 0; n-- {
|
||||
wg.Add(1)
|
||||
pendingDials.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
c, err := dialer.DialContext(dialCtx, l.Addr().Network(), l.Addr().String())
|
||||
pendingDials.Done()
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&dialed, 1)
|
||||
defer c.Close()
|
||||
|
||||
// Keep this end of the connection alive until after the Listener
|
||||
// finishes.
|
||||
conns := append(<-connc, c)
|
||||
if len(conns) == max {
|
||||
go func() {
|
||||
// Give the server a bit of time to make sure it doesn't exceed its
|
||||
// limit after serving this connection, then cancel the remaining
|
||||
// Dials (if any).
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cancelDial()
|
||||
l.Close()
|
||||
}()
|
||||
}
|
||||
connc <- conns
|
||||
|
||||
b := make([]byte, len(msg))
|
||||
if n, err := c.Read(b); n < len(b) {
|
||||
// The kernel may queue more than max connections (allowing their dials to
|
||||
// succeed), but only max of them should actually be accepted by the
|
||||
// server. We can distinguish the two based on whether the listener writes
|
||||
// anything to the connection — a connection that was queued but not
|
||||
// accepted will be closed without transferring any data.
|
||||
if b, err := io.ReadAll(c); len(b) < len(msg) {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&served, 1)
|
||||
}()
|
||||
}
|
||||
|
||||
// Give the server a bit of time after it saturates to make sure it doesn't
|
||||
// exceed its limit after serving this connection, then cancel the remaining
|
||||
// dials (if any).
|
||||
<-saturated
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cancelDial()
|
||||
// Wait for the dials to complete to ensure that the port isn't reused before
|
||||
// the dials are actually attempted.
|
||||
pendingDials.Wait()
|
||||
l.Close()
|
||||
wg.Wait()
|
||||
|
||||
conns := <-connc
|
||||
for _, c := range conns {
|
||||
c.Close()
|
||||
t.Logf("served %d simultaneous connections (of %d dialed, %d attempted)", served, dialed, attempts)
|
||||
|
||||
// If some other process (such as a port scan or another test) happens to dial
|
||||
// the listener at the same time, the listener could end up burning its quota
|
||||
// on that, resulting in fewer than max test connections being served.
|
||||
// But the number served certainly cannot be greater.
|
||||
if served > max {
|
||||
t.Errorf("expected at most %d served", max)
|
||||
}
|
||||
t.Logf("with limit %d, served %d connections (of %d dialed, %d attempted)", max, served, len(conns), attempts)
|
||||
if served != max {
|
||||
t.Errorf("expected exactly %d served", max)
|
||||
}
|
||||
|
||||
func TestLimitListenerSaturation(t *testing.T) {
|
||||
const (
|
||||
max = 5
|
||||
attemptsPerWave = max * 2
|
||||
waves = 10
|
||||
msg = "bye\n"
|
||||
)
|
||||
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l = LimitListener(l, max)
|
||||
|
||||
acceptDone := make(chan struct{})
|
||||
defer func() {
|
||||
l.Close()
|
||||
<-acceptDone
|
||||
}()
|
||||
go func() {
|
||||
defer close(acceptDone)
|
||||
|
||||
var open, peakOpen int32
|
||||
var (
|
||||
saturated = make(chan struct{})
|
||||
saturatedOnce sync.Once
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if n := atomic.AddInt32(&open, 1); n > peakOpen {
|
||||
peakOpen = n
|
||||
if n == max {
|
||||
saturatedOnce.Do(func() {
|
||||
// Wait a bit to make sure the listener doesn't exceed its limit
|
||||
// after accepting this connection, then allow the in-flight
|
||||
// connections to write out and close.
|
||||
time.AfterFunc(10*time.Millisecond, func() { close(saturated) })
|
||||
})
|
||||
}
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
<-saturated
|
||||
io.WriteString(c, msg)
|
||||
atomic.AddInt32(&open, -1)
|
||||
c.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
t.Logf("with limit %d, accepted a peak of %d simultaneous connections", max, peakOpen)
|
||||
if peakOpen > max {
|
||||
t.Errorf("want at most %d", max)
|
||||
}
|
||||
}()
|
||||
|
||||
for wave := 0; wave < waves; wave++ {
|
||||
var dialed, served int32
|
||||
var wg sync.WaitGroup
|
||||
for n := attemptsPerWave; n > 0; n-- {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
c, err := net.Dial(l.Addr().Network(), l.Addr().String())
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&dialed, 1)
|
||||
defer c.Close()
|
||||
|
||||
if b, err := io.ReadAll(c); len(b) < len(msg) {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&served, 1)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
t.Logf("served %d connections (of %d dialed, %d attempted)", served, dialed, attemptsPerWave)
|
||||
// We expect that the kernel can queue at least attemptsPerWave
|
||||
// connections at a time (since it's only a small number), so every
|
||||
// connection should eventually be served.
|
||||
if served != attemptsPerWave {
|
||||
t.Errorf("expected %d served", attemptsPerWave)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,9 +266,7 @@ func TestLimitListenerClose(t *testing.T) {
|
||||
|
||||
// Allow the subsequent Accept to block before closing the listener.
|
||||
// (Accept should unblock and return.)
|
||||
timer := time.AfterFunc(10*time.Millisecond, func() {
|
||||
ln.Close()
|
||||
})
|
||||
timer := time.AfterFunc(10*time.Millisecond, func() { ln.Close() })
|
||||
|
||||
c, err = ln.Accept()
|
||||
if err == nil {
|
||||
|
||||
Reference in New Issue
Block a user