quic: implement stream flush

Do not commit data written to a stream to the network until
the user explicitly flushes the stream, the stream output
buffer fills, or the output buffer contains enough data to
fill a packet.

We could write data immediately (as net.TCPConn does),
but this can require the user to put their own buffer in
front of the stream. Since we necessarily need to maintain
a retransmit buffer in the stream, this is redundant.

We could do something like Nagle's algorithm, but nobody
wants that.

So make flushes explicit.

For golang/go#58547

Change-Id: I29dc9d79556c7a358a360ef79beb38b45040b6bc
Reviewed-on: https://go-review.googlesource.com/c/net/+/543083
Auto-Submit: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
This commit is contained in:
Damien Neil
2023-10-30 15:07:39 -07:00
committed by Gopher Robot
parent d87f99be5d
commit 399218d6bc
7 changed files with 171 additions and 30 deletions

View File

@@ -136,12 +136,10 @@ func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip
}
}
// The smallest allowed maximum QUIC datagram size is 1200 bytes.
// TODO: PMTU discovery.
const maxDatagramSize = 1200
c.logConnectionStarted(cids.originalDstConnID, peerAddr)
c.keysAppData.init()
c.loss.init(c.side, maxDatagramSize, now)
c.loss.init(c.side, smallestMaxDatagramSize, now)
c.streamsInit()
c.lifetimeInit()
c.restartIdleTimer(now)

View File

@@ -262,6 +262,7 @@ func TestConnOutflowBlocked(t *testing.T) {
if n != len(data) || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
}
s.Flush()
tc.wantFrame("stream writes data up to MAX_DATA limit",
packetType1RTT, debugFrameStream{
@@ -310,6 +311,7 @@ func TestConnOutflowMaxDataDecreases(t *testing.T) {
if n != len(data) || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
}
s.Flush()
tc.wantFrame("stream writes data up to MAX_DATA limit",
packetType1RTT, debugFrameStream{
@@ -337,7 +339,9 @@ func TestConnOutflowMaxDataRoundRobin(t *testing.T) {
}
s1.Write(make([]byte, 10))
s1.Flush()
s2.Write(make([]byte, 10))
s2.Flush()
tc.writeFrames(packetType1RTT, debugFrameMaxData{
max: 1,
@@ -378,6 +382,7 @@ func TestConnOutflowMetaAndData(t *testing.T) {
data := makeTestData(32)
s.Write(data)
s.Flush()
s.CloseRead()
tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled",
@@ -405,6 +410,7 @@ func TestConnOutflowResentData(t *testing.T) {
data := makeTestData(15)
s.Write(data[:8])
s.Flush()
tc.wantFrame("data is under MAX_DATA limit, all sent",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -421,6 +427,7 @@ func TestConnOutflowResentData(t *testing.T) {
})
s.Write(data[8:])
s.Flush()
tc.wantFrame("new data is sent up to the MAX_DATA limit",
packetType1RTT, debugFrameStream{
id: s.id,

View File

@@ -183,7 +183,7 @@ func TestLostStreamFrameEmpty(t *testing.T) {
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
c.Flush() // open the stream
tc.wantFrame("created bidirectional stream 0",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 0),
@@ -213,6 +213,7 @@ func TestLostStreamWithData(t *testing.T) {
p.initialMaxStreamDataUni = 1 << 20
})
s.Write(data[:4])
s.Flush()
tc.wantFrame("send [0,4)",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -220,6 +221,7 @@ func TestLostStreamWithData(t *testing.T) {
data: data[:4],
})
s.Write(data[4:8])
s.Flush()
tc.wantFrame("send [4,8)",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -263,6 +265,7 @@ func TestLostStreamPartialLoss(t *testing.T) {
})
for i := range data {
s.Write(data[i : i+1])
s.Flush()
tc.wantFrame(fmt.Sprintf("send STREAM frame with byte %v", i),
packetType1RTT, debugFrameStream{
id: s.id,

View File

@@ -19,33 +19,33 @@ func TestStreamsCreate(t *testing.T) {
tc := newTestConn(t, clientSide, permissiveTransportParameters)
tc.handshake()
c, err := tc.conn.NewStream(ctx)
s, err := tc.conn.NewStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created bidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 0, // client-initiated, bidi, number 0
data: []byte{},
})
c, err = tc.conn.NewSendOnlyStream(ctx)
s, err = tc.conn.NewSendOnlyStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created unidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 2, // client-initiated, uni, number 0
data: []byte{},
})
c, err = tc.conn.NewStream(ctx)
s, err = tc.conn.NewStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created bidirectional stream 1",
packetType1RTT, debugFrameStream{
id: 4, // client-initiated, uni, number 4
@@ -177,11 +177,11 @@ func TestStreamsStreamSendOnly(t *testing.T) {
tc := newTestConn(t, serverSide, permissiveTransportParameters)
tc.handshake()
c, err := tc.conn.NewSendOnlyStream(ctx)
s, err := tc.conn.NewSendOnlyStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created unidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 3, // server-initiated, uni, number 0

View File

@@ -64,10 +64,14 @@ const defaultKeepAlivePeriod = 0
// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2-6
const timerGranularity = 1 * time.Millisecond
// The smallest allowed maximum datagram size.
// https://www.rfc-editor.org/rfc/rfc9000#section-14
const smallestMaxDatagramSize = 1200
// Minimum size of a UDP datagram sent by a client carrying an Initial packet,
// or a server containing an ack-eliciting Initial packet.
// https://www.rfc-editor.org/rfc/rfc9000#section-14.1
const paddedInitialDatagramSize = 1200
const paddedInitialDatagramSize = smallestMaxDatagramSize
// Maximum number of streams of a given type which may be created.
// https://www.rfc-editor.org/rfc/rfc9000.html#section-4.6-2

View File

@@ -38,10 +38,11 @@ type Stream struct {
// the write will fail.
outgate gate
out pipe // buffered data to send
outflushed int64 // offset of last flush call
outwin int64 // maximum MAX_STREAM_DATA received from the peer
outmaxsent int64 // maximum data offset we've sent to the peer
outmaxbuf int64 // maximum amount of data we will buffer
outunsent rangeset[int64] // ranges buffered but not yet sent
outunsent rangeset[int64] // ranges buffered but not yet sent (only flushed data)
outacked rangeset[int64] // ranges sent and acknowledged
outopened sentVal // set if we should open the stream
outclosed sentVal // set by CloseWrite
@@ -240,8 +241,6 @@ func (s *Stream) Write(b []byte) (n int, err error) {
// WriteContext writes data to the stream write buffer.
// Buffered data is only sent when the buffer is sufficiently full.
// Call the Flush method to ensure buffered data is sent.
//
// TODO: Implement Flush.
func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
if s.IsReadOnly() {
return 0, errors.New("write to read-only stream")
@@ -269,10 +268,6 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
s.outUnlock()
return n, errors.New("write to closed stream")
}
// We set outopened here rather than below,
// so if this is a zero-length write we still
// open the stream despite not writing any data to it.
s.outopened.set()
if len(b) == 0 {
break
}
@@ -282,13 +277,26 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
// Amount to write is min(the full buffer, data up to the write limit).
// This is a number of bytes.
nn := min(int64(len(b)), lim-s.out.end)
// Copy the data into the output buffer and mark it as unsent.
if s.out.end <= s.outwin {
s.outunsent.add(s.out.end, min(s.out.end+nn, s.outwin))
}
// Copy the data into the output buffer.
s.out.writeAt(b[:nn], s.out.end)
b = b[nn:]
n += int(nn)
// Possibly flush the output buffer.
// We automatically flush if:
// - We have enough data to consume the send window.
// Sending this data may cause the peer to extend the window.
// - We have buffered as much data as we're willing do.
// We need to send data to clear out buffer space.
// - We have enough data to fill a 1-RTT packet using the smallest
// possible maximum datagram size (1200 bytes, less header byte,
// connection ID, packet number, and AEAD overhead).
const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
shouldFlush := s.out.end >= s.outwin || // peer send window is full
s.out.end >= lim || // local send buffer is full
(s.out.end-s.outflushed) >= autoFlushSize // enough data buffered
if shouldFlush {
s.flushLocked()
}
if s.out.end > s.outwin {
// We're blocked by flow control.
// Send a STREAM_DATA_BLOCKED frame to let the peer know.
@@ -301,6 +309,23 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
return n, nil
}
// Flush flushes data written to the stream.
// It does not wait for the peer to acknowledge receipt of the data.
// Use CloseContext to wait for the peer's acknowledgement.
func (s *Stream) Flush() {
s.outgate.lock()
defer s.outUnlock()
s.flushLocked()
}
func (s *Stream) flushLocked() {
s.outopened.set()
if s.outflushed < s.outwin {
s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
}
s.outflushed = s.out.end
}
// Close closes the stream.
// See CloseContext for more details.
func (s *Stream) Close() error {
@@ -363,6 +388,7 @@ func (s *Stream) CloseWrite() {
s.outgate.lock()
defer s.outUnlock()
s.outclosed.set()
s.flushLocked()
}
// Reset aborts writes on the stream and notifies the peer
@@ -612,8 +638,8 @@ func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
if maxStreamData <= s.outwin {
return nil
}
if s.out.end > s.outwin {
s.outunsent.add(s.outwin, min(maxStreamData, s.out.end))
if s.outflushed > s.outwin {
s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
}
s.outwin = maxStreamData
if s.out.end > s.outwin {
@@ -741,10 +767,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
}
for {
// STREAM
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
if end := off + size; end > s.outmaxsent {
// This will require connection-level flow control to send.
end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
end = max(end, off)
size = end - off
}
fin := s.outclosed.isSet() && off+size == s.out.end

View File

@@ -38,6 +38,7 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
if n != writeBufferSize || err != context.Canceled {
t.Fatalf("s.WriteContext() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize)
}
s.Flush()
tc.wantFrame("first write buffer of data sent",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -47,7 +48,9 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
// Blocking write, which must wait for buffer space.
w := runAsync(tc, func(ctx context.Context) (int, error) {
return s.WriteContext(ctx, want[writeBufferSize:])
n, err := s.WriteContext(ctx, want[writeBufferSize:])
s.Flush()
return n, err
})
tc.wantIdle("write buffer is full, no more data can be sent")
@@ -170,6 +173,7 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
t.Fatal(err)
}
s.WriteContext(ctx, want[:1])
s.Flush()
tc.wantFrame("sent data (1 byte) fits within flow control limit",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -723,7 +727,7 @@ func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFra
if err != nil {
t.Fatal(err)
}
s.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("new stream is opened",
packetType1RTT, debugFrameStream{
id: sid,
@@ -968,7 +972,9 @@ func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
want := make([]byte, 4096)
rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless
w := runAsync(tc, func(ctx context.Context) (int, error) {
return s.WriteContext(ctx, want)
n, err := s.WriteContext(ctx, want)
s.Flush()
return n, err
})
got := make([]byte, 0, len(want))
for {
@@ -998,6 +1004,7 @@ func TestStreamCloseWaitsForAcks(t *testing.T) {
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
data := make([]byte, 100)
s.WriteContext(ctx, data)
s.Flush()
tc.wantFrame("conn sends data for the stream",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -1064,6 +1071,7 @@ func TestStreamCloseUnblocked(t *testing.T) {
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
data := make([]byte, 100)
s.WriteContext(ctx, data)
s.Flush()
tc.wantFrame("conn sends data for the stream",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -1228,6 +1236,7 @@ func TestStreamPeerStopSendingForActiveStream(t *testing.T) {
tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters)
for i := 0; i < 4; i++ {
s.Write([]byte{byte(i)})
s.Flush()
tc.wantFrame("write sends a STREAM frame to peer",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -1271,6 +1280,99 @@ func TestStreamReceiveDataBlocked(t *testing.T) {
tc.wantIdle("no response to STREAM_DATA_BLOCKED and DATA_BLOCKED")
}
func TestStreamFlushExplicit(t *testing.T) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
tc, s := newTestConnAndLocalStream(t, clientSide, styp, permissiveTransportParameters)
want := []byte{0, 1, 2, 3}
n, err := s.Write(want)
if n != len(want) || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
}
tc.wantIdle("unflushed data is not sent")
s.Flush()
tc.wantFrame("data is sent after flush",
packetType1RTT, debugFrameStream{
id: s.id,
data: want,
})
})
}
func TestStreamFlushImplicitExact(t *testing.T) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
const writeBufferSize = 4
tc, s := newTestConnAndLocalStream(t, clientSide, styp,
permissiveTransportParameters,
func(c *Config) {
c.MaxStreamWriteBufferSize = writeBufferSize
})
want := []byte{0, 1, 2, 3, 4, 5, 6}
// This write doesn't quite fill the output buffer.
n, err := s.Write(want[:3])
if n != 3 || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
}
tc.wantIdle("unflushed data is not sent")
// This write fills the output buffer exactly.
n, err = s.Write(want[3:4])
if n != 1 || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
}
tc.wantFrame("data is sent after write buffer fills",
packetType1RTT, debugFrameStream{
id: s.id,
data: want[0:4],
})
})
}
func TestStreamFlushImplicitLargerThanBuffer(t *testing.T) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
const writeBufferSize = 4
tc, s := newTestConnAndLocalStream(t, clientSide, styp,
permissiveTransportParameters,
func(c *Config) {
c.MaxStreamWriteBufferSize = writeBufferSize
})
want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
w := runAsync(tc, func(ctx context.Context) (int, error) {
n, err := s.WriteContext(ctx, want)
return n, err
})
tc.wantFrame("data is sent after write buffer fills",
packetType1RTT, debugFrameStream{
id: s.id,
data: want[0:4],
})
tc.writeAckForAll()
tc.wantFrame("ack permits sending more data",
packetType1RTT, debugFrameStream{
id: s.id,
off: 4,
data: want[4:8],
})
tc.writeAckForAll()
tc.wantIdle("write buffer is not full")
if n, err := w.result(); n != len(want) || err != nil {
t.Fatalf("Write() = %v, %v; want %v, nil", n, err, len(want))
}
s.Flush()
tc.wantFrame("flush sends last buffer of data",
packetType1RTT, debugFrameStream{
id: s.id,
off: 8,
data: want[8:],
})
})
}
type streamSide string
const (