mirror of
https://github.com/golang/net.git
synced 2026-04-01 02:47:08 +09:00
quic: fix several bugs in flow control accounting
Connection-level flow control sets a bound on the total maximum stream offset of all data sent, not the total amount of bytes sent in STREAM frames. For example, if we send the bytes [0,10) for a stream, and then retransmit the same bytes due to packet loss, that consumes 10 bytes of connection-level flow, not 20. We were incorrectly tracking total bytes sent. Fix this. We were blocking retransmission of data in lost STREAM frames on availability of connection-level flow control. We now place a stream with retransmitted data on queueMeta (non-flow-controlled data), since we have already accounted for the flow control window consumption of the data. We were incorrectly marking a stream as being able to send an empty STREAM frame with a FIN bit, when the stream was actually blocked on stream-level flow control. Fix this. For golang/go#58547 Change-Id: Ib2ace94183750078a19d945256507060ea786735 Reviewed-on: https://go-review.googlesource.com/c/net/+/532716 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
This commit is contained in:
@@ -394,3 +394,37 @@ func TestConnOutflowMetaAndData(t *testing.T) {
|
||||
data: data,
|
||||
})
|
||||
}
|
||||
|
||||
func TestConnOutflowResentData(t *testing.T) {
|
||||
tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
|
||||
permissiveTransportParameters,
|
||||
func(p *transportParameters) {
|
||||
p.initialMaxData = 10
|
||||
})
|
||||
tc.ignoreFrame(frameTypeAck)
|
||||
|
||||
data := makeTestData(15)
|
||||
s.Write(data[:8])
|
||||
tc.wantFrame("data is under MAX_DATA limit, all sent",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
data: data[:8],
|
||||
})
|
||||
|
||||
// Lose the last STREAM packet.
|
||||
const pto = false
|
||||
tc.triggerLossOrPTO(packetType1RTT, false)
|
||||
tc.wantFrame("lost STREAM data is retransmitted",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
data: data[:8],
|
||||
})
|
||||
|
||||
s.Write(data[8:])
|
||||
tc.wantFrame("new data is sent up to the MAX_DATA limit",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
off: 8,
|
||||
data: data[8:10],
|
||||
})
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ type Stream struct {
|
||||
outgate gate
|
||||
out pipe // buffered data to send
|
||||
outwin int64 // maximum MAX_STREAM_DATA received from the peer
|
||||
outmaxsent int64 // maximum data offset we've sent to the peer
|
||||
outmaxbuf int64 // maximum amount of data we will buffer
|
||||
outunsent rangeset[int64] // ranges buffered but not yet sent
|
||||
outacked rangeset[int64] // ranges sent and acknowledged
|
||||
@@ -494,8 +495,12 @@ func (s *Stream) outUnlockNoQueue() streamState {
|
||||
case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
|
||||
state = streamOutSendMeta
|
||||
case len(s.outunsent) > 0: // STREAM frame with data
|
||||
state = streamOutSendData
|
||||
case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
|
||||
if s.outunsent.min() < s.outmaxsent {
|
||||
state = streamOutSendMeta // resent data, will not consume flow control
|
||||
} else {
|
||||
state = streamOutSendData // new data, requires flow control
|
||||
}
|
||||
case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
|
||||
state = streamOutSendMeta
|
||||
case s.outopened.shouldSend(): // STREAM frame with no data
|
||||
state = streamOutSendMeta
|
||||
@@ -725,7 +730,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
|
||||
for {
|
||||
// STREAM
|
||||
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
|
||||
size = min(size, s.conn.streams.outflow.avail())
|
||||
if end := off + size; end > s.outmaxsent {
|
||||
// This will require connection-level flow control to send.
|
||||
end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
|
||||
size = end - off
|
||||
}
|
||||
fin := s.outclosed.isSet() && off+size == s.out.end
|
||||
shouldSend := size > 0 || // have data to send
|
||||
s.outopened.shouldSendPTO(pto) || // should open the stream
|
||||
@@ -738,8 +747,12 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
|
||||
return false
|
||||
}
|
||||
s.out.copy(off, b)
|
||||
s.conn.streams.outflow.consume(int64(len(b)))
|
||||
s.outunsent.sub(off, off+int64(len(b)))
|
||||
end := off + int64(len(b))
|
||||
if end > s.outmaxsent {
|
||||
s.conn.streams.outflow.consume(end - s.outmaxsent)
|
||||
s.outmaxsent = end
|
||||
}
|
||||
s.outunsent.sub(off, end)
|
||||
s.frameOpensStream(pnum)
|
||||
if fin {
|
||||
s.outclosed.setSent(pnum)
|
||||
|
||||
@@ -1094,6 +1094,44 @@ func TestStreamCloseUnblocked(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
|
||||
ctx := canceledContext()
|
||||
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
|
||||
func(p *transportParameters) {
|
||||
//p.initialMaxData = 0
|
||||
p.initialMaxStreamDataUni = 0
|
||||
})
|
||||
tc.ignoreFrame(frameTypeStreamDataBlocked)
|
||||
if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
|
||||
t.Fatalf("s.Write = %v", err)
|
||||
}
|
||||
s.CloseWrite()
|
||||
tc.wantIdle("stream write is blocked by flow control")
|
||||
|
||||
tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
|
||||
id: s.id,
|
||||
max: 1,
|
||||
})
|
||||
tc.wantFrame("send data up to flow control limit",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
data: []byte{0},
|
||||
})
|
||||
tc.wantIdle("stream write is again blocked by flow control")
|
||||
|
||||
tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
|
||||
id: s.id,
|
||||
max: 2,
|
||||
})
|
||||
tc.wantFrame("send remaining data and FIN",
|
||||
packetType1RTT, debugFrameStream{
|
||||
id: s.id,
|
||||
off: 1,
|
||||
data: []byte{1},
|
||||
fin: true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
|
||||
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
|
||||
ctx := canceledContext()
|
||||
|
||||
Reference in New Issue
Block a user