http2: buffer the most recently received PRIORITY_UPDATE frame

Per RFC 9218, servers should buffer the most recently received
PRIORITY_UPDATE frame. This CL implements said buffering within the RFC
9218 priority write scheduler.

For golang/go#75500

Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607
Reviewed-on: https://go-review.googlesource.com/c/net/+/728401
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <husin@google.com>
This commit is contained in:
Nicholas S. Husin
2025-12-08 17:32:23 -05:00
committed by Nicholas Husin
parent 35e1306bdd
commit 7d3dbb06ce
2 changed files with 78 additions and 0 deletions

View File

@@ -37,6 +37,15 @@ type priorityWriteSchedulerRFC9218 struct {
// incremental streams or not, when urgency is the same in a given Pop()
// call.
prioritizeIncremental bool
// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
priorityUpdateBuf struct {
// streamID being 0 means that the buffer is empty. This is a safe
// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
streamID uint32
priority PriorityParam
}
}
func newPriorityWriteSchedulerRFC9218() WriteScheduler {
@@ -50,6 +59,10 @@ func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStr
if ws.streams[streamID].location != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
if streamID == ws.priorityUpdateBuf.streamID {
ws.priorityUpdateBuf.streamID = 0
opt.priority = ws.priorityUpdateBuf.priority
}
q := ws.queuePool.get()
ws.streams[streamID] = streamMetadata{
location: q,
@@ -95,6 +108,8 @@ func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
ws.priorityUpdateBuf.streamID = streamID
ws.priorityUpdateBuf.priority = priority
return
}

View File

@@ -324,3 +324,66 @@ func TestPrioritySchedulerIdempotentUpdate(t *testing.T) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}
func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) {
const maxFrameSize = 16
sc := &serverConn{maxFrameSize: maxFrameSize}
ws := newPriorityWriteSchedulerRFC9218()
// Priorities are adjusted for streams that are not open yet.
ws.AdjustStream(1, PriorityParam{urgency: 0})
ws.AdjustStream(5, PriorityParam{urgency: 0})
for _, streamID := range []uint32{1, 3, 5} {
stream := &stream{
id: streamID,
sc: sc,
}
stream.flow.add(1 << 20) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{
priority: PriorityParam{
urgency: 7,
incremental: 1,
},
})
wr := FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: make([]byte, maxFrameSize*(3)),
endStream: false,
},
stream: stream,
}
ws.Push(wr)
}
const controlFrames = 2
for range controlFrames {
ws.Push(makeWriteNonStreamRequest())
}
// We should get the control frames first.
for range controlFrames {
wr, ok := ws.Pop()
if !ok || wr.StreamID() != 0 {
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
}
}
// The most recent priority adjustment is buffered and applied. Older ones
// are ignored.
want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3}
var got []uint32
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
got = append(got, wr.StreamID())
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}