From 7d3dbb06ceb45c3180f4f446cd635e6b59a0b9c2 Mon Sep 17 00:00:00 2001 From: "Nicholas S. Husin" Date: Mon, 8 Dec 2025 17:32:23 -0500 Subject: [PATCH] http2: buffer the most recently received PRIORITY_UPDATE frame Per RFC 9218, servers should buffer the most recently received PRIORITY_UPDATE frame. This CL implements said buffering within the RFC 9218 priority write scheduler. For golang/go#75500 Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607 Reviewed-on: https://go-review.googlesource.com/c/net/+/728401 LUCI-TryBot-Result: Go LUCI Reviewed-by: Damien Neil Reviewed-by: Nicholas Husin --- http2/writesched_priority_rfc9218.go | 15 ++++++ http2/writesched_priority_rfc9218_test.go | 63 +++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/http2/writesched_priority_rfc9218.go b/http2/writesched_priority_rfc9218.go index cb4cadc3..dfbfc1eb 100644 --- a/http2/writesched_priority_rfc9218.go +++ b/http2/writesched_priority_rfc9218.go @@ -37,6 +37,15 @@ type priorityWriteSchedulerRFC9218 struct { // incremental streams or not, when urgency is the same in a given Pop() // call. prioritizeIncremental bool + + // priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we + // receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame. + priorityUpdateBuf struct { + // streamID being 0 means that the buffer is empty. This is a safe + // assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR. + streamID uint32 + priority PriorityParam + } } func newPriorityWriteSchedulerRFC9218() WriteScheduler { @@ -50,6 +59,10 @@ func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStr if ws.streams[streamID].location != nil { panic(fmt.Errorf("stream %d already opened", streamID)) } + if streamID == ws.priorityUpdateBuf.streamID { + ws.priorityUpdateBuf.streamID = 0 + opt.priority = ws.priorityUpdateBuf.priority + } q := ws.queuePool.get() ws.streams[streamID] = streamMetadata{ location: q, @@ -95,6 +108,8 @@ func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority metadata := ws.streams[streamID] q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental if q == nil { + ws.priorityUpdateBuf.streamID = streamID + ws.priorityUpdateBuf.priority = priority return } diff --git a/http2/writesched_priority_rfc9218_test.go b/http2/writesched_priority_rfc9218_test.go index 28a82018..f8fd9c82 100644 --- a/http2/writesched_priority_rfc9218_test.go +++ b/http2/writesched_priority_rfc9218_test.go @@ -324,3 +324,66 @@ func TestPrioritySchedulerIdempotentUpdate(t *testing.T) { t.Fatalf("popped streams %v, want %v", got, want) } } + +func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) { + const maxFrameSize = 16 + sc := &serverConn{maxFrameSize: maxFrameSize} + ws := newPriorityWriteSchedulerRFC9218() + + // Priorities are adjusted for streams that are not open yet. + ws.AdjustStream(1, PriorityParam{urgency: 0}) + ws.AdjustStream(5, PriorityParam{urgency: 0}) + for _, streamID := range []uint32{1, 3, 5} { + stream := &stream{ + id: streamID, + sc: sc, + } + stream.flow.add(1 << 20) // arbitrary large value + ws.OpenStream(streamID, OpenStreamOptions{ + priority: PriorityParam{ + urgency: 7, + incremental: 1, + }, + }) + wr := FrameWriteRequest{ + write: &writeData{ + streamID: streamID, + p: make([]byte, maxFrameSize*(3)), + endStream: false, + }, + stream: stream, + } + ws.Push(wr) + } + + const controlFrames = 2 + for range controlFrames { + ws.Push(makeWriteNonStreamRequest()) + } + + // We should get the control frames first. + for range controlFrames { + wr, ok := ws.Pop() + if !ok || wr.StreamID() != 0 { + t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok) + } + } + + // The most recent priority adjustment is buffered and applied. Older ones + // are ignored. + want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3} + var got []uint32 + for { + wr, ok := ws.Pop() + if !ok { + break + } + if wr.DataSize() != maxFrameSize { + t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize) + } + got = append(got, wr.StreamID()) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("popped streams %v, want %v", got, want) + } +}