mirror of
https://github.com/golang/net.git
synced 2026-03-31 18:37:08 +09:00
quic: add Stream.Set{Read,Write}Context, drop {Read,Write,Close}Context
The ReadContext, WriteContext, and CloseContext Stream methods are
difficult to use in conjunction with functions that operate on an
io.Reader, io.Writer, or io.Closer. For example, it's reasonable
to want to use io.ReadFull with a Stream, but doing so with a context
is cumbersome.
Drop the Stream methods that take a Context in favor of stateful
methods that set the Context to use for read and write operations.
(Close counts as a write operation, since it blocks waiting for
data to be sent.)
Intentionally make Set{Read,Write}Context not concurrency safe,
to allow the race detector to catch misuse. This shouldn't be a
problem for correct programs, since reads and writes are
inherently not concurrency-safe.
For golang/go#58547
Change-Id: I41378eb552d89a720921fc8644d3637c1a545676
Reviewed-on: https://go-review.googlesource.com/c/net/+/550795
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
This commit is contained in:
@@ -249,8 +249,9 @@ func TestConnCloseUnblocksNewStream(t *testing.T) {
|
||||
func TestConnCloseUnblocksStreamRead(t *testing.T) {
|
||||
testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error {
|
||||
s := newLocalStream(t, tc, bidiStream)
|
||||
s.SetReadContext(ctx)
|
||||
buf := make([]byte, 16)
|
||||
_, err := s.ReadContext(ctx, buf)
|
||||
_, err := s.Read(buf)
|
||||
return err
|
||||
}, permissiveTransportParameters)
|
||||
}
|
||||
@@ -258,8 +259,9 @@ func TestConnCloseUnblocksStreamRead(t *testing.T) {
|
||||
func TestConnCloseUnblocksStreamWrite(t *testing.T) {
|
||||
testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error {
|
||||
s := newLocalStream(t, tc, bidiStream)
|
||||
s.SetWriteContext(ctx)
|
||||
buf := make([]byte, 32)
|
||||
_, err := s.WriteContext(ctx, buf)
|
||||
_, err := s.Write(buf)
|
||||
return err
|
||||
}, permissiveTransportParameters, func(c *Config) {
|
||||
c.MaxStreamWriteBufferSize = 16
|
||||
@@ -269,11 +271,12 @@ func TestConnCloseUnblocksStreamWrite(t *testing.T) {
|
||||
func TestConnCloseUnblocksStreamClose(t *testing.T) {
|
||||
testConnCloseUnblocks(t, func(ctx context.Context, tc *testConn) error {
|
||||
s := newLocalStream(t, tc, bidiStream)
|
||||
s.SetWriteContext(ctx)
|
||||
buf := make([]byte, 16)
|
||||
_, err := s.WriteContext(ctx, buf)
|
||||
_, err := s.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.CloseContext(ctx)
|
||||
return s.Close()
|
||||
}, permissiveTransportParameters)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
)
|
||||
|
||||
func TestConnInflowReturnOnRead(t *testing.T) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
|
||||
c.MaxConnReadBufferSize = 64
|
||||
})
|
||||
@@ -21,14 +20,14 @@ func TestConnInflowReturnOnRead(t *testing.T) {
|
||||
data: make([]byte, 64),
|
||||
})
|
||||
const readSize = 8
|
||||
if n, err := s.ReadContext(ctx, make([]byte, readSize)); n != readSize || err != nil {
|
||||
if n, err := s.Read(make([]byte, readSize)); n != readSize || err != nil {
|
||||
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize)
|
||||
}
|
||||
tc.wantFrame("available window increases, send a MAX_DATA",
|
||||
packetType1RTT, debugFrameMaxData{
|
||||
max: 64 + readSize,
|
||||
})
|
||||
if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64-readSize || err != nil {
|
||||
if n, err := s.Read(make([]byte, 64)); n != 64-readSize || err != nil {
|
||||
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize)
|
||||
}
|
||||
tc.wantFrame("available window increases, send a MAX_DATA",
|
||||
@@ -42,7 +41,7 @@ func TestConnInflowReturnOnRead(t *testing.T) {
|
||||
data: make([]byte, 64),
|
||||
})
|
||||
tc.wantIdle("connection is idle")
|
||||
if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64 || err != nil {
|
||||
if n, err := s.Read(make([]byte, 64)); n != 64 || err != nil {
|
||||
t.Fatalf("offset 64: s.Read() = %v, %v; want %v, nil", n, err, 64)
|
||||
}
|
||||
}
|
||||
@@ -79,10 +78,10 @@ func TestConnInflowReturnOnRacingReads(t *testing.T) {
|
||||
t.Fatalf("conn.AcceptStream() = %v", err)
|
||||
}
|
||||
read1 := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s1.ReadContext(ctx, make([]byte, 16))
|
||||
return s1.Read(make([]byte, 16))
|
||||
})
|
||||
read2 := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s2.ReadContext(ctx, make([]byte, 1))
|
||||
return s2.Read(make([]byte, 1))
|
||||
})
|
||||
// This MAX_DATA might extend the window by 16 or 17, depending on
|
||||
// whether the second write occurs before the update happens.
|
||||
@@ -90,10 +89,10 @@ func TestConnInflowReturnOnRacingReads(t *testing.T) {
|
||||
packetType1RTT, debugFrameMaxData{})
|
||||
tc.wantIdle("redundant MAX_DATA is not sent")
|
||||
if _, err := read1.result(); err != nil {
|
||||
t.Errorf("ReadContext #1 = %v", err)
|
||||
t.Errorf("Read #1 = %v", err)
|
||||
}
|
||||
if _, err := read2.result(); err != nil {
|
||||
t.Errorf("ReadContext #2 = %v", err)
|
||||
t.Errorf("Read #2 = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -227,13 +226,13 @@ func TestConnInflowMultipleStreams(t *testing.T) {
|
||||
t.Fatalf("AcceptStream() = %v", err)
|
||||
}
|
||||
streams = append(streams, s)
|
||||
if n, err := s.ReadContext(ctx, make([]byte, 1)); err != nil || n != 1 {
|
||||
if n, err := s.Read(make([]byte, 1)); err != nil || n != 1 {
|
||||
t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err)
|
||||
}
|
||||
}
|
||||
tc.wantIdle("streams have read data, but not enough to update MAX_DATA")
|
||||
|
||||
if n, err := streams[0].ReadContext(ctx, make([]byte, 32)); err != nil || n != 31 {
|
||||
if n, err := streams[0].Read(make([]byte, 32)); err != nil || n != 31 {
|
||||
t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err)
|
||||
}
|
||||
tc.wantFrame("read enough data to trigger a MAX_DATA update",
|
||||
|
||||
@@ -433,7 +433,8 @@ func TestLostMaxStreamsFrameMostRecent(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("AcceptStream() = %v", err)
|
||||
}
|
||||
s.CloseContext(ctx)
|
||||
s.SetWriteContext(ctx)
|
||||
s.Close()
|
||||
if styp == bidiStream {
|
||||
tc.wantFrame("stream is closed",
|
||||
packetType1RTT, debugFrameStream{
|
||||
@@ -480,7 +481,7 @@ func TestLostMaxStreamsFrameNotMostRecent(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("AcceptStream() = %v", err)
|
||||
}
|
||||
if err := s.CloseContext(ctx); err != nil {
|
||||
if err := s.Close(); err != nil {
|
||||
t.Fatalf("stream.Close() = %v", err)
|
||||
}
|
||||
tc.wantFrame("closing stream updates peer's MAX_STREAMS",
|
||||
@@ -512,7 +513,7 @@ func TestLostStreamDataBlockedFrame(t *testing.T) {
|
||||
})
|
||||
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.WriteContext(ctx, []byte{0, 1, 2, 3})
|
||||
return s.Write([]byte{0, 1, 2, 3})
|
||||
})
|
||||
defer w.cancel()
|
||||
tc.wantFrame("write is blocked by flow control",
|
||||
@@ -564,7 +565,7 @@ func TestLostStreamDataBlockedFrameAfterStreamUnblocked(t *testing.T) {
|
||||
|
||||
data := []byte{0, 1, 2, 3}
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.WriteContext(ctx, data)
|
||||
return s.Write(data)
|
||||
})
|
||||
defer w.cancel()
|
||||
tc.wantFrame("write is blocked by flow control",
|
||||
|
||||
@@ -230,8 +230,8 @@ func TestStreamsWriteQueueFairness(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
streams = append(streams, s)
|
||||
if n, err := s.WriteContext(ctx, data); n != len(data) || err != nil {
|
||||
t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(data))
|
||||
if n, err := s.Write(data); n != len(data) || err != nil {
|
||||
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
|
||||
}
|
||||
// Wait for the stream to finish writing whatever frames it can before
|
||||
// congestion control blocks it.
|
||||
@@ -298,7 +298,7 @@ func TestStreamsShutdown(t *testing.T) {
|
||||
side: localStream,
|
||||
styp: uniStream,
|
||||
setup: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
s.CloseContext(canceledContext())
|
||||
s.Close()
|
||||
},
|
||||
shutdown: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
tc.writeAckForAll()
|
||||
@@ -311,7 +311,7 @@ func TestStreamsShutdown(t *testing.T) {
|
||||
tc.writeFrames(packetType1RTT, debugFrameResetStream{
|
||||
id: s.id,
|
||||
})
|
||||
s.CloseContext(canceledContext())
|
||||
s.Close()
|
||||
},
|
||||
shutdown: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
tc.writeAckForAll()
|
||||
@@ -321,8 +321,8 @@ func TestStreamsShutdown(t *testing.T) {
|
||||
side: localStream,
|
||||
styp: bidiStream,
|
||||
setup: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
s.CloseContext(canceledContext())
|
||||
tc.wantIdle("all frames after CloseContext are ignored")
|
||||
s.Close()
|
||||
tc.wantIdle("all frames after Close are ignored")
|
||||
tc.writeAckForAll()
|
||||
},
|
||||
shutdown: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
@@ -335,13 +335,12 @@ func TestStreamsShutdown(t *testing.T) {
|
||||
side: remoteStream,
|
||||
styp: uniStream,
|
||||
setup: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
ctx := canceledContext()
|
||||
tc.writeFrames(packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
fin: true,
|
||||
})
|
||||
if n, err := s.ReadContext(ctx, make([]byte, 16)); n != 0 || err != io.EOF {
|
||||
t.Errorf("ReadContext() = %v, %v; want 0, io.EOF", n, err)
|
||||
if n, err := s.Read(make([]byte, 16)); n != 0 || err != io.EOF {
|
||||
t.Errorf("Read() = %v, %v; want 0, io.EOF", n, err)
|
||||
}
|
||||
},
|
||||
shutdown: func(t *testing.T, tc *testConn, s *Stream) {
|
||||
@@ -451,17 +450,14 @@ func TestStreamsCreateAndCloseRemote(t *testing.T) {
|
||||
id: op.id,
|
||||
})
|
||||
case acceptOp:
|
||||
s, err := tc.conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("AcceptStream() = %q; want stream %v", err, stringID(op.id))
|
||||
}
|
||||
s := tc.acceptStream()
|
||||
if s.id != op.id {
|
||||
t.Fatalf("accepted stram %v; want stream %v", err, stringID(op.id))
|
||||
t.Fatalf("accepted stream %v; want stream %v", stringID(s.id), stringID(op.id))
|
||||
}
|
||||
t.Logf("accepted stream %v", stringID(op.id))
|
||||
// Immediately close the stream, so the stream becomes done when the
|
||||
// peer closes its end.
|
||||
s.CloseContext(ctx)
|
||||
s.Close()
|
||||
}
|
||||
p := tc.readPacket()
|
||||
if p != nil {
|
||||
|
||||
@@ -382,6 +382,17 @@ func (tc *testConn) cleanup() {
|
||||
<-tc.conn.donec
|
||||
}
|
||||
|
||||
func (tc *testConn) acceptStream() *Stream {
|
||||
tc.t.Helper()
|
||||
s, err := tc.conn.AcceptStream(canceledContext())
|
||||
if err != nil {
|
||||
tc.t.Fatalf("conn.AcceptStream() = %v, want stream", err)
|
||||
}
|
||||
s.SetReadContext(canceledContext())
|
||||
s.SetWriteContext(canceledContext())
|
||||
return s
|
||||
}
|
||||
|
||||
func logDatagram(t *testing.T, text string, d *testDatagram) {
|
||||
t.Helper()
|
||||
if !*testVV {
|
||||
|
||||
@@ -18,6 +18,11 @@ type Stream struct {
|
||||
id streamID
|
||||
conn *Conn
|
||||
|
||||
// Contexts used for read/write operations.
|
||||
// Intentionally not mutex-guarded, to allow the race detector to catch concurrent access.
|
||||
inctx context.Context
|
||||
outctx context.Context
|
||||
|
||||
// ingate's lock guards all receive-related state.
|
||||
//
|
||||
// The gate condition is set if a read from the stream will not block,
|
||||
@@ -152,6 +157,8 @@ func newStream(c *Conn, id streamID) *Stream {
|
||||
inresetcode: -1, // -1 indicates no RESET_STREAM received
|
||||
ingate: newLockedGate(),
|
||||
outgate: newLockedGate(),
|
||||
inctx: context.Background(),
|
||||
outctx: context.Background(),
|
||||
}
|
||||
if !s.IsReadOnly() {
|
||||
s.outdone = make(chan struct{})
|
||||
@@ -159,6 +166,22 @@ func newStream(c *Conn, id streamID) *Stream {
|
||||
return s
|
||||
}
|
||||
|
||||
// SetReadContext sets the context used for reads from the stream.
|
||||
//
|
||||
// It is not safe to call SetReadContext concurrently.
|
||||
func (s *Stream) SetReadContext(ctx context.Context) {
|
||||
s.inctx = ctx
|
||||
}
|
||||
|
||||
// SetWriteContext sets the context used for writes to the stream.
|
||||
// The write context is also used by Close when waiting for writes to be
|
||||
// received by the peer.
|
||||
//
|
||||
// It is not safe to call SetWriteContext concurrently.
|
||||
func (s *Stream) SetWriteContext(ctx context.Context) {
|
||||
s.outctx = ctx
|
||||
}
|
||||
|
||||
// IsReadOnly reports whether the stream is read-only
|
||||
// (a unidirectional stream created by the peer).
|
||||
func (s *Stream) IsReadOnly() bool {
|
||||
@@ -172,24 +195,18 @@ func (s *Stream) IsWriteOnly() bool {
|
||||
}
|
||||
|
||||
// Read reads data from the stream.
|
||||
// See ReadContext for more details.
|
||||
func (s *Stream) Read(b []byte) (n int, err error) {
|
||||
return s.ReadContext(context.Background(), b)
|
||||
}
|
||||
|
||||
// ReadContext reads data from the stream.
|
||||
//
|
||||
// ReadContext returns as soon as at least one byte of data is available.
|
||||
// Read returns as soon as at least one byte of data is available.
|
||||
//
|
||||
// If the peer closes the stream cleanly, ReadContext returns io.EOF after
|
||||
// If the peer closes the stream cleanly, Read returns io.EOF after
|
||||
// returning all data sent by the peer.
|
||||
// If the peer aborts reads on the stream, ReadContext returns
|
||||
// If the peer aborts reads on the stream, Read returns
|
||||
// an error wrapping StreamResetCode.
|
||||
func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
|
||||
func (s *Stream) Read(b []byte) (n int, err error) {
|
||||
if s.IsWriteOnly() {
|
||||
return 0, errors.New("read from write-only stream")
|
||||
}
|
||||
if err := s.ingate.waitAndLock(ctx, s.conn.testHooks); err != nil {
|
||||
if err := s.ingate.waitAndLock(s.inctx, s.conn.testHooks); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() {
|
||||
@@ -237,17 +254,11 @@ func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
|
||||
}
|
||||
|
||||
// Write writes data to the stream.
|
||||
// See WriteContext for more details.
|
||||
func (s *Stream) Write(b []byte) (n int, err error) {
|
||||
return s.WriteContext(context.Background(), b)
|
||||
}
|
||||
|
||||
// WriteContext writes data to the stream.
|
||||
//
|
||||
// WriteContext writes data to the stream write buffer.
|
||||
// Write writes data to the stream write buffer.
|
||||
// Buffered data is only sent when the buffer is sufficiently full.
|
||||
// Call the Flush method to ensure buffered data is sent.
|
||||
func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
|
||||
func (s *Stream) Write(b []byte) (n int, err error) {
|
||||
if s.IsReadOnly() {
|
||||
return 0, errors.New("write to read-only stream")
|
||||
}
|
||||
@@ -259,7 +270,7 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
|
||||
if len(b) > 0 && !canWrite {
|
||||
// Our send buffer is full. Wait for the peer to ack some data.
|
||||
s.outUnlock()
|
||||
if err := s.outgate.waitAndLock(ctx, s.conn.testHooks); err != nil {
|
||||
if err := s.outgate.waitAndLock(s.outctx, s.conn.testHooks); err != nil {
|
||||
return n, err
|
||||
}
|
||||
// Successfully returning from waitAndLockGate means we are no longer
|
||||
@@ -317,7 +328,7 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
|
||||
|
||||
// Flush flushes data written to the stream.
|
||||
// It does not wait for the peer to acknowledge receipt of the data.
|
||||
// Use CloseContext to wait for the peer's acknowledgement.
|
||||
// Use Close to wait for the peer's acknowledgement.
|
||||
func (s *Stream) Flush() {
|
||||
s.outgate.lock()
|
||||
defer s.outUnlock()
|
||||
@@ -333,27 +344,21 @@ func (s *Stream) flushLocked() {
|
||||
}
|
||||
|
||||
// Close closes the stream.
|
||||
// See CloseContext for more details.
|
||||
func (s *Stream) Close() error {
|
||||
return s.CloseContext(context.Background())
|
||||
}
|
||||
|
||||
// CloseContext closes the stream.
|
||||
// Any blocked stream operations will be unblocked and return errors.
|
||||
//
|
||||
// CloseContext flushes any data in the stream write buffer and waits for the peer to
|
||||
// Close flushes any data in the stream write buffer and waits for the peer to
|
||||
// acknowledge receipt of the data.
|
||||
// If the stream has been reset, it waits for the peer to acknowledge the reset.
|
||||
// If the context expires before the peer receives the stream's data,
|
||||
// CloseContext discards the buffer and returns the context error.
|
||||
func (s *Stream) CloseContext(ctx context.Context) error {
|
||||
// Close discards the buffer and returns the context error.
|
||||
func (s *Stream) Close() error {
|
||||
s.CloseRead()
|
||||
if s.IsReadOnly() {
|
||||
return nil
|
||||
}
|
||||
s.CloseWrite()
|
||||
// TODO: Return code from peer's RESET_STREAM frame?
|
||||
if err := s.conn.waitOnDone(ctx, s.outdone); err != nil {
|
||||
if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil {
|
||||
return err
|
||||
}
|
||||
s.outgate.lock()
|
||||
@@ -369,7 +374,7 @@ func (s *Stream) CloseContext(ctx context.Context) error {
|
||||
//
|
||||
// CloseRead notifies the peer that the stream has been closed for reading.
|
||||
// It does not wait for the peer to acknowledge the closure.
|
||||
// Use CloseContext to wait for the peer's acknowledgement.
|
||||
// Use Close to wait for the peer's acknowledgement.
|
||||
func (s *Stream) CloseRead() {
|
||||
if s.IsWriteOnly() {
|
||||
return
|
||||
@@ -394,7 +399,7 @@ func (s *Stream) CloseRead() {
|
||||
//
|
||||
// CloseWrite sends any data in the stream write buffer to the peer.
|
||||
// It does not wait for the peer to acknowledge receipt of the data.
|
||||
// Use CloseContext to wait for the peer's acknowledgement.
|
||||
// Use Close to wait for the peer's acknowledgement.
|
||||
func (s *Stream) CloseWrite() {
|
||||
if s.IsReadOnly() {
|
||||
return
|
||||
@@ -412,7 +417,7 @@ func (s *Stream) CloseWrite() {
|
||||
// Reset sends the application protocol error code, which must be
|
||||
// less than 2^62, to the peer.
|
||||
// It does not wait for the peer to acknowledge receipt of the error.
|
||||
// Use CloseContext to wait for the peer's acknowledgement.
|
||||
// Use Close to wait for the peer's acknowledgement.
|
||||
//
|
||||
// Reset does not affect reads.
|
||||
// Use CloseRead to abort reads on the stream.
|
||||
|
||||
@@ -200,7 +200,6 @@ func TestStreamLimitMaxStreamsFrameTooLarge(t *testing.T) {
|
||||
|
||||
func TestStreamLimitSendUpdatesMaxStreams(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
tc := newTestConn(t, serverSide, func(c *Config) {
|
||||
if styp == uniStream {
|
||||
c.MaxUniRemoteStreams = 4
|
||||
@@ -218,13 +217,9 @@ func TestStreamLimitSendUpdatesMaxStreams(t *testing.T) {
|
||||
id: newStreamID(clientSide, styp, int64(i)),
|
||||
fin: true,
|
||||
})
|
||||
s, err := tc.conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("AcceptStream = %v", err)
|
||||
}
|
||||
streams = append(streams, s)
|
||||
streams = append(streams, tc.acceptStream())
|
||||
}
|
||||
streams[3].CloseContext(ctx)
|
||||
streams[3].Close()
|
||||
if styp == bidiStream {
|
||||
tc.wantFrame("stream is closed",
|
||||
packetType1RTT, debugFrameStream{
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
|
||||
func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
const writeBufferSize = 4
|
||||
tc := newTestConn(t, clientSide, permissiveTransportParameters, func(c *Config) {
|
||||
@@ -28,15 +27,12 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
|
||||
tc.handshake()
|
||||
tc.ignoreFrame(frameTypeAck)
|
||||
|
||||
s, err := tc.conn.newLocalStream(ctx, styp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := newLocalStream(t, tc, styp)
|
||||
|
||||
// Non-blocking write.
|
||||
n, err := s.WriteContext(ctx, want)
|
||||
n, err := s.Write(want)
|
||||
if n != writeBufferSize || err != context.Canceled {
|
||||
t.Fatalf("s.WriteContext() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize)
|
||||
t.Fatalf("s.Write() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize)
|
||||
}
|
||||
s.Flush()
|
||||
tc.wantFrame("first write buffer of data sent",
|
||||
@@ -48,7 +44,8 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
|
||||
|
||||
// Blocking write, which must wait for buffer space.
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
n, err := s.WriteContext(ctx, want[writeBufferSize:])
|
||||
s.SetWriteContext(ctx)
|
||||
n, err := s.Write(want[writeBufferSize:])
|
||||
s.Flush()
|
||||
return n, err
|
||||
})
|
||||
@@ -75,7 +72,7 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
|
||||
})
|
||||
|
||||
if n, err := w.result(); n != len(want)-writeBufferSize || err != nil {
|
||||
t.Fatalf("s.WriteContext() = %v, %v; want %v, nil",
|
||||
t.Fatalf("s.Write() = %v, %v; want %v, nil",
|
||||
len(want)-writeBufferSize, err, writeBufferSize)
|
||||
}
|
||||
})
|
||||
@@ -99,7 +96,7 @@ func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
|
||||
}
|
||||
|
||||
// Data is written to the stream output buffer, but we have no flow control.
|
||||
_, err = s.WriteContext(ctx, want[:1])
|
||||
_, err = s.Write(want[:1])
|
||||
if err != nil {
|
||||
t.Fatalf("write with available output buffer: unexpected error: %v", err)
|
||||
}
|
||||
@@ -110,7 +107,7 @@ func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
|
||||
})
|
||||
|
||||
// Write more data.
|
||||
_, err = s.WriteContext(ctx, want[1:])
|
||||
_, err = s.Write(want[1:])
|
||||
if err != nil {
|
||||
t.Fatalf("write with available output buffer: unexpected error: %v", err)
|
||||
}
|
||||
@@ -172,7 +169,7 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s.WriteContext(ctx, want[:1])
|
||||
s.Write(want[:1])
|
||||
s.Flush()
|
||||
tc.wantFrame("sent data (1 byte) fits within flow control limit",
|
||||
packetType1RTT, debugFrameStream{
|
||||
@@ -188,7 +185,7 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
|
||||
})
|
||||
|
||||
// Write [1,4).
|
||||
s.WriteContext(ctx, want[1:])
|
||||
s.Write(want[1:])
|
||||
tc.wantFrame("stream limit is 4 bytes, ignoring decrease in MAX_STREAM_DATA",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
@@ -208,7 +205,7 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
|
||||
})
|
||||
|
||||
// Write [1,4).
|
||||
s.WriteContext(ctx, want[4:])
|
||||
s.Write(want[4:])
|
||||
tc.wantFrame("stream limit is 8 bytes, ignoring decrease in MAX_STREAM_DATA",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
@@ -220,7 +217,6 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
|
||||
|
||||
func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
const maxWriteBuffer = 4
|
||||
tc := newTestConn(t, clientSide, func(p *transportParameters) {
|
||||
@@ -238,12 +234,10 @@ func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
|
||||
// Write more data than StreamWriteBufferSize.
|
||||
// The peer has given us plenty of flow control,
|
||||
// so we're just blocked by our local limit.
|
||||
s, err := tc.conn.newLocalStream(ctx, styp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := newLocalStream(t, tc, styp)
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.WriteContext(ctx, want)
|
||||
s.SetWriteContext(ctx)
|
||||
return s.Write(want)
|
||||
})
|
||||
tc.wantFrame("stream write should send as much data as write buffer allows",
|
||||
packetType1RTT, debugFrameStream{
|
||||
@@ -266,7 +260,7 @@ func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
|
||||
w.cancel()
|
||||
n, err := w.result()
|
||||
if n != 2*maxWriteBuffer || err == nil {
|
||||
t.Fatalf("WriteContext() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
|
||||
t.Fatalf("Write() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -397,7 +391,6 @@ func TestStreamReceive(t *testing.T) {
|
||||
}},
|
||||
}} {
|
||||
testStreamTypes(t, test.name, func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
tc := newTestConn(t, serverSide)
|
||||
tc.handshake()
|
||||
sid := newStreamID(clientSide, styp, 0)
|
||||
@@ -413,21 +406,17 @@ func TestStreamReceive(t *testing.T) {
|
||||
fin: f.fin,
|
||||
})
|
||||
if s == nil {
|
||||
var err error
|
||||
s, err = tc.conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
tc.t.Fatalf("conn.AcceptStream() = %v", err)
|
||||
}
|
||||
s = tc.acceptStream()
|
||||
}
|
||||
for {
|
||||
n, err := s.ReadContext(ctx, got[total:])
|
||||
t.Logf("s.ReadContext() = %v, %v", n, err)
|
||||
n, err := s.Read(got[total:])
|
||||
t.Logf("s.Read() = %v, %v", n, err)
|
||||
total += n
|
||||
if f.wantEOF && err != io.EOF {
|
||||
t.Fatalf("ReadContext() error = %v; want io.EOF", err)
|
||||
t.Fatalf("Read() error = %v; want io.EOF", err)
|
||||
}
|
||||
if !f.wantEOF && err == io.EOF {
|
||||
t.Fatalf("ReadContext() error = io.EOF, want something else")
|
||||
t.Fatalf("Read() error = io.EOF, want something else")
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
@@ -468,8 +457,8 @@ func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
|
||||
}
|
||||
tc.wantIdle("stream window is not extended before data is read")
|
||||
buf := make([]byte, maxWindowSize+1)
|
||||
if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != nil {
|
||||
t.Fatalf("s.ReadContext() = %v, %v; want %v, nil", n, err, maxWindowSize)
|
||||
if n, err := s.Read(buf); n != maxWindowSize || err != nil {
|
||||
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, maxWindowSize)
|
||||
}
|
||||
tc.wantFrame("stream window is extended after reading data",
|
||||
packetType1RTT, debugFrameMaxStreamData{
|
||||
@@ -482,8 +471,8 @@ func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
|
||||
data: make([]byte, maxWindowSize),
|
||||
fin: true,
|
||||
})
|
||||
if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != io.EOF {
|
||||
t.Fatalf("s.ReadContext() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
|
||||
if n, err := s.Read(buf); n != maxWindowSize || err != io.EOF {
|
||||
t.Fatalf("s.Read() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
|
||||
}
|
||||
tc.wantIdle("stream window is not extended after FIN")
|
||||
})
|
||||
@@ -673,18 +662,19 @@ func TestStreamReceiveUnblocksReader(t *testing.T) {
|
||||
t.Fatalf("AcceptStream() = %v", err)
|
||||
}
|
||||
|
||||
// ReadContext succeeds immediately, since we already have data.
|
||||
// Read succeeds immediately, since we already have data.
|
||||
got := make([]byte, len(want))
|
||||
read := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.ReadContext(ctx, got)
|
||||
return s.Read(got)
|
||||
})
|
||||
if n, err := read.result(); n != write1size || err != nil {
|
||||
t.Fatalf("ReadContext = %v, %v; want %v, nil", n, err, write1size)
|
||||
t.Fatalf("Read = %v, %v; want %v, nil", n, err, write1size)
|
||||
}
|
||||
|
||||
// ReadContext blocks waiting for more data.
|
||||
// Read blocks waiting for more data.
|
||||
read = runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.ReadContext(ctx, got[write1size:])
|
||||
s.SetReadContext(ctx)
|
||||
return s.Read(got[write1size:])
|
||||
})
|
||||
tc.writeFrames(packetType1RTT, debugFrameStream{
|
||||
id: sid,
|
||||
@@ -693,7 +683,7 @@ func TestStreamReceiveUnblocksReader(t *testing.T) {
|
||||
fin: true,
|
||||
})
|
||||
if n, err := read.result(); n != len(want)-write1size || err != io.EOF {
|
||||
t.Fatalf("ReadContext = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
|
||||
t.Fatalf("Read = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
|
||||
}
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Fatalf("read bytes %x, want %x", got, want)
|
||||
@@ -935,7 +925,8 @@ func TestStreamResetBlockedStream(t *testing.T) {
|
||||
})
|
||||
tc.ignoreFrame(frameTypeStreamDataBlocked)
|
||||
writing := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
return s.WriteContext(ctx, []byte{0, 1, 2, 3, 4, 5, 6, 7})
|
||||
s.SetWriteContext(ctx)
|
||||
return s.Write([]byte{0, 1, 2, 3, 4, 5, 6, 7})
|
||||
})
|
||||
tc.wantFrame("stream writes data until write buffer fills",
|
||||
packetType1RTT, debugFrameStream{
|
||||
@@ -972,7 +963,7 @@ func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
|
||||
want := make([]byte, 4096)
|
||||
rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
n, err := s.WriteContext(ctx, want)
|
||||
n, err := s.Write(want)
|
||||
s.Flush()
|
||||
return n, err
|
||||
})
|
||||
@@ -992,7 +983,7 @@ func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
|
||||
got = append(got, sf.data...)
|
||||
}
|
||||
if n, err := w.result(); n != len(want) || err != nil {
|
||||
t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(want))
|
||||
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
|
||||
}
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Fatalf("mismatch in received stream data")
|
||||
@@ -1000,17 +991,16 @@ func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStreamCloseWaitsForAcks(t *testing.T) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
|
||||
data := make([]byte, 100)
|
||||
s.WriteContext(ctx, data)
|
||||
s.Write(data)
|
||||
s.Flush()
|
||||
tc.wantFrame("conn sends data for the stream",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
data: data,
|
||||
})
|
||||
if err := s.CloseContext(ctx); err != context.Canceled {
|
||||
if err := s.Close(); err != context.Canceled {
|
||||
t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
|
||||
}
|
||||
tc.wantFrame("conn sends FIN for closed stream",
|
||||
@@ -1021,21 +1011,22 @@ func TestStreamCloseWaitsForAcks(t *testing.T) {
|
||||
data: []byte{},
|
||||
})
|
||||
closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
|
||||
return struct{}{}, s.CloseContext(ctx)
|
||||
s.SetWriteContext(ctx)
|
||||
return struct{}{}, s.Close()
|
||||
})
|
||||
if _, err := closing.result(); err != errNotDone {
|
||||
t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
|
||||
t.Fatalf("s.Close() = %v, want it to block waiting for acks", err)
|
||||
}
|
||||
tc.writeAckForAll()
|
||||
if _, err := closing.result(); err != nil {
|
||||
t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
|
||||
t.Fatalf("s.Close() = %v, want nil (all data acked)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamCloseReadOnly(t *testing.T) {
|
||||
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, permissiveTransportParameters)
|
||||
if err := s.CloseContext(canceledContext()); err != nil {
|
||||
t.Errorf("s.CloseContext() = %v, want nil", err)
|
||||
if err := s.Close(); err != nil {
|
||||
t.Errorf("s.Close() = %v, want nil", err)
|
||||
}
|
||||
tc.wantFrame("closed stream sends STOP_SENDING",
|
||||
packetType1RTT, debugFrameStopSending{
|
||||
@@ -1069,17 +1060,16 @@ func TestStreamCloseUnblocked(t *testing.T) {
|
||||
},
|
||||
}} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
|
||||
data := make([]byte, 100)
|
||||
s.WriteContext(ctx, data)
|
||||
s.Write(data)
|
||||
s.Flush()
|
||||
tc.wantFrame("conn sends data for the stream",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
data: data,
|
||||
})
|
||||
if err := s.CloseContext(ctx); err != context.Canceled {
|
||||
if err := s.Close(); err != context.Canceled {
|
||||
t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
|
||||
}
|
||||
tc.wantFrame("conn sends FIN for closed stream",
|
||||
@@ -1090,34 +1080,34 @@ func TestStreamCloseUnblocked(t *testing.T) {
|
||||
data: []byte{},
|
||||
})
|
||||
closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
|
||||
return struct{}{}, s.CloseContext(ctx)
|
||||
s.SetWriteContext(ctx)
|
||||
return struct{}{}, s.Close()
|
||||
})
|
||||
if _, err := closing.result(); err != errNotDone {
|
||||
t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
|
||||
t.Fatalf("s.Close() = %v, want it to block waiting for acks", err)
|
||||
}
|
||||
test.unblock(tc, s)
|
||||
_, err := closing.result()
|
||||
switch {
|
||||
case err == errNotDone:
|
||||
t.Fatalf("s.CloseContext() still blocking; want it to have returned")
|
||||
t.Fatalf("s.Close() still blocking; want it to have returned")
|
||||
case err == nil && !test.success:
|
||||
t.Fatalf("s.CloseContext() = nil, want error")
|
||||
t.Fatalf("s.Close() = nil, want error")
|
||||
case err != nil && test.success:
|
||||
t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
|
||||
t.Fatalf("s.Close() = %v, want nil (all data acked)", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
|
||||
func(p *transportParameters) {
|
||||
//p.initialMaxData = 0
|
||||
p.initialMaxStreamDataUni = 0
|
||||
})
|
||||
tc.ignoreFrame(frameTypeStreamDataBlocked)
|
||||
if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
|
||||
if _, err := s.Write([]byte{0, 1}); err != nil {
|
||||
t.Fatalf("s.Write = %v", err)
|
||||
}
|
||||
s.CloseWrite()
|
||||
@@ -1149,7 +1139,6 @@ func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
|
||||
|
||||
func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
|
||||
data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
|
||||
tc.writeFrames(packetType1RTT, debugFrameStream{
|
||||
@@ -1157,7 +1146,7 @@ func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
|
||||
data: data,
|
||||
})
|
||||
got := make([]byte, 4)
|
||||
if n, err := s.ReadContext(ctx, got); n != len(got) || err != nil {
|
||||
if n, err := s.Read(got); n != len(got) || err != nil {
|
||||
t.Fatalf("Read start of stream: got %v, %v; want %v, nil", n, err, len(got))
|
||||
}
|
||||
const sentCode = 42
|
||||
@@ -1167,7 +1156,7 @@ func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
|
||||
code: sentCode,
|
||||
})
|
||||
wantErr := StreamErrorCode(sentCode)
|
||||
if n, err := s.ReadContext(ctx, got); n != 0 || !errors.Is(err, wantErr) {
|
||||
if n, err := s.Read(got); n != 0 || !errors.Is(err, wantErr) {
|
||||
t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
|
||||
}
|
||||
})
|
||||
@@ -1177,8 +1166,9 @@ func TestStreamPeerResetWakesBlockedRead(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
|
||||
reader := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
s.SetReadContext(ctx)
|
||||
got := make([]byte, 4)
|
||||
return s.ReadContext(ctx, got)
|
||||
return s.Read(got)
|
||||
})
|
||||
const sentCode = 42
|
||||
tc.writeFrames(packetType1RTT, debugFrameResetStream{
|
||||
@@ -1348,7 +1338,8 @@ func TestStreamFlushImplicitLargerThanBuffer(t *testing.T) {
|
||||
want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
|
||||
w := runAsync(tc, func(ctx context.Context) (int, error) {
|
||||
n, err := s.WriteContext(ctx, want)
|
||||
s.SetWriteContext(ctx)
|
||||
n, err := s.Write(want)
|
||||
return n, err
|
||||
})
|
||||
|
||||
@@ -1401,7 +1392,10 @@ func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opt
|
||||
tc := newTestConn(t, side, opts...)
|
||||
tc.handshake()
|
||||
tc.ignoreFrame(frameTypeAck)
|
||||
return tc, newLocalStream(t, tc, styp)
|
||||
s := newLocalStream(t, tc, styp)
|
||||
s.SetReadContext(canceledContext())
|
||||
s.SetWriteContext(canceledContext())
|
||||
return tc, s
|
||||
}
|
||||
|
||||
func newLocalStream(t *testing.T, tc *testConn, styp streamType) *Stream {
|
||||
@@ -1411,6 +1405,8 @@ func newLocalStream(t *testing.T, tc *testConn, styp streamType) *Stream {
|
||||
if err != nil {
|
||||
t.Fatalf("conn.newLocalStream(%v) = %v", styp, err)
|
||||
}
|
||||
s.SetReadContext(canceledContext())
|
||||
s.SetWriteContext(canceledContext())
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -1419,7 +1415,10 @@ func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, op
|
||||
tc := newTestConn(t, side, opts...)
|
||||
tc.handshake()
|
||||
tc.ignoreFrame(frameTypeAck)
|
||||
return tc, newRemoteStream(t, tc, styp)
|
||||
s := newRemoteStream(t, tc, styp)
|
||||
s.SetReadContext(canceledContext())
|
||||
s.SetWriteContext(canceledContext())
|
||||
return tc, s
|
||||
}
|
||||
|
||||
func newRemoteStream(t *testing.T, tc *testConn, styp streamType) *Stream {
|
||||
@@ -1432,6 +1431,8 @@ func newRemoteStream(t *testing.T, tc *testConn, styp streamType) *Stream {
|
||||
if err != nil {
|
||||
t.Fatalf("conn.AcceptStream() = %v", err)
|
||||
}
|
||||
s.SetReadContext(canceledContext())
|
||||
s.SetWriteContext(canceledContext())
|
||||
return s
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user