http2: introduce a new write scheduler based on RFC 9218 priority scheme

This change introduces a new write scheduler that prioritizes writes
based on RFC 9218. Eventually, this scheduler will be used to replace
the existing priority scheduler based on RFC 7540, which has been
deprecated in RFC 9113.

No behavioral changes has been introduced as this scheduler is not used
anywhere yet.

goos: linux
goarch: amd64
pkg: golang.org/x/net/http2
cpu: AMD EPYC 7B13
BenchmarkWriteSchedulerThroughputRoundRobin-64                            100000            140884 ns/op          139201 B/op       2900 allocs/op
BenchmarkWriteSchedulerLifetimeRoundRobin-64                              100000            149632 ns/op          139202 B/op       2900 allocs/op
BenchmarkWriteSchedulerThroughputRandom-64                                100000            218311 ns/op          139201 B/op       2900 allocs/op
BenchmarkWriteSchedulerLifetimeRandom-64                                  100000            216559 ns/op          139203 B/op       2900 allocs/op
BenchmarkWriteSchedulerThroughputPriorityRFC7540-64                       100000            587625 ns/op          139201 B/op       2900 allocs/op
BenchmarkWriteSchedulerThroughputPriorityRFC9218Incremental-64            100000            149563 ns/op          139200 B/op       2900 allocs/op
BenchmarkWriteSchedulerLifetimePriorityRFC9218Incremental-64              100000            163697 ns/op          139201 B/op       2900 allocs/op
BenchmarkWriteSchedulerThroughputPriorityRFC9218NonIncremental-64         100000            145364 ns/op          139201 B/op       2900 allocs/op
BenchmarkWriteSchedulerLifetimePriorityRFC9218NonIncremental-64           100000            159316 ns/op          139203 B/op       2900 allocs/op

For golang/go#75500

Change-Id: Id5db195f6f75970f9cc3c7b7a292df96a139de8b
Reviewed-on: https://go-review.googlesource.com/c/net/+/704758
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <husin@google.com>
This commit is contained in:
Nicholas S. Husin
2025-09-16 00:30:54 -04:00
committed by Nicholas Husin
parent 653f4f665b
commit 10342476f5
5 changed files with 645 additions and 0 deletions

View File

@@ -1152,6 +1152,15 @@ type PriorityFrame struct {
PriorityParam
}
var defaultRFC9218Priority = PriorityParam{
incremental: 0,
urgency: 3,
}
// Note that HTTP/2 has had two different prioritization schemes, and
// PriorityParam struct below is a superset of both schemes. The exported
// symbols are from RFC 7540 and the non-exported ones are from RFC 9218.
// PriorityParam are the stream prioritzation parameters.
type PriorityParam struct {
// StreamDep is a 31-bit stream identifier for the
@@ -1167,6 +1176,20 @@ type PriorityParam struct {
// the spec, "Add one to the value to obtain a weight between
// 1 and 256."
Weight uint8
// "The urgency (u) parameter value is Integer (see Section 3.3.1 of
// [STRUCTURED-FIELDS]), between 0 and 7 inclusive, in descending order of
// priority. The default is 3."
urgency uint8
// "The incremental (i) parameter value is Boolean (see Section 3.3.6 of
// [STRUCTURED-FIELDS]). It indicates if an HTTP response can be processed
// incrementally, i.e., provide some meaningful output as chunks of the
// response arrive."
//
// We use uint8 (i.e. 0 is false, 1 is true) instead of bool so we can
// avoid unnecessary type conversions and because either type takes 1 byte.
incremental uint8
}
func (p PriorityParam) IsZero() bool {

View File

@@ -42,6 +42,8 @@ type OpenStreamOptions struct {
// PusherID is zero if the stream was initiated by the client. Otherwise,
// PusherID names the stream that pushed the newly opened stream.
PusherID uint32
// priority is used to set the priority of the newly opened stream.
priority PriorityParam
}
// FrameWriteRequest is a request to write a frame.

View File

@@ -0,0 +1,180 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"testing"
)
func benchmarkThroughput(b *testing.B, wsFunc func() WriteScheduler, priority PriorityParam) {
const maxFrameSize = 16
const streamCount = 100
ws := wsFunc()
sc := &serverConn{maxFrameSize: maxFrameSize}
streams := make([]*stream, streamCount)
// Possible stream payloads. We vary the payload size of different streams
// to simulate real traffic somewhat.
streamsFrame := [][]byte{
make([]byte, maxFrameSize*5),
make([]byte, maxFrameSize*10),
make([]byte, maxFrameSize*15),
make([]byte, maxFrameSize*20),
make([]byte, maxFrameSize*25),
}
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 30) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{
priority: priority,
})
}
for b.Loop() {
for i := range streams {
streamID := uint32(i) + 1
ws.Push(FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: streamsFrame[i%len(streamsFrame)],
endStream: false,
},
stream: streams[i],
})
}
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
b.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
}
}
for i := range streams {
streamID := uint32(i) + 1
ws.CloseStream(streamID)
}
}
func benchmarkStreamLifetime(b *testing.B, wsFunc func() WriteScheduler, priority PriorityParam) {
const maxFrameSize = 16
const streamCount = 100
ws := wsFunc()
sc := &serverConn{maxFrameSize: maxFrameSize}
streams := make([]*stream, streamCount)
// Possible stream payloads. We vary the payload size of different streams
// to simulate real traffic somewhat.
streamsFrame := [][]byte{
make([]byte, maxFrameSize*5),
make([]byte, maxFrameSize*10),
make([]byte, maxFrameSize*15),
make([]byte, maxFrameSize*20),
make([]byte, maxFrameSize*25),
}
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 30) // arbitrary large value
}
for b.Loop() {
for i := range streams {
streamID := uint32(i) + 1
ws.OpenStream(streamID, OpenStreamOptions{
priority: priority,
})
ws.Push(FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: streamsFrame[i%len(streamsFrame)],
endStream: false,
},
stream: streams[i],
})
}
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
b.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
}
for i := range streams {
streamID := uint32(i) + 1
ws.CloseStream(streamID)
}
}
}
func BenchmarkWriteSchedulerThroughputRoundRobin(b *testing.B) {
benchmarkThroughput(b, newRoundRobinWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerLifetimeRoundRobin(b *testing.B) {
benchmarkStreamLifetime(b, newRoundRobinWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputRandom(b *testing.B) {
benchmarkThroughput(b, NewRandomWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerLifetimeRandom(b *testing.B) {
benchmarkStreamLifetime(b, NewRandomWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputPriorityRFC7540(b *testing.B) {
benchmarkThroughput(b, func() WriteScheduler { return NewPriorityWriteScheduler(nil) }, PriorityParam{})
}
func BenchmarkWriteSchedulerLifetimePriorityRFC7540(b *testing.B) {
// RFC7540 priority scheduler does not always succeed in closing the
// stream, causing this benchmark to panic due to opening an already open
// stream.
b.SkipNow()
benchmarkStreamLifetime(b, func() WriteScheduler { return NewPriorityWriteScheduler(nil) }, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputPriorityRFC9218Incremental(b *testing.B) {
benchmarkThroughput(b, newPriorityWriteSchedulerRFC9128, PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 1,
})
}
func BenchmarkWriteSchedulerLifetimePriorityRFC9218Incremental(b *testing.B) {
benchmarkStreamLifetime(b, newPriorityWriteSchedulerRFC9128, PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 1,
})
}
func BenchmarkWriteSchedulerThroughputPriorityRFC9218NonIncremental(b *testing.B) {
benchmarkThroughput(b, newPriorityWriteSchedulerRFC9128, PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 0,
})
}
func BenchmarkWriteSchedulerLifetimePriorityRFC9218NonIncremental(b *testing.B) {
benchmarkStreamLifetime(b, newPriorityWriteSchedulerRFC9128, PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 0,
})
}

View File

@@ -0,0 +1,203 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"fmt"
"math"
)
type streamMetadata struct {
location *writeQueue
priority PriorityParam
}
type priorityWriteSchedulerRFC9218 struct {
// control contains control frames (SETTINGS, PING, etc.).
control writeQueue
// heads contain the head of a circular list of streams.
// We put these heads within a nested array that represents urgency and
// incremental, as defined in
// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
heads [8][2]*writeQueue
// streams contains a mapping between each stream ID and their metadata, so
// we can quickly locate them when needing to, for example, adjust their
// priority.
streams map[uint32]streamMetadata
// queuePool are empty queues for reuse.
queuePool writeQueuePool
// prioritizeIncremental is used to determine whether we should prioritize
// incremental streams or not, when urgency is the same in a given Pop()
// call.
prioritizeIncremental bool
}
func newPriorityWriteSchedulerRFC9128() WriteScheduler {
ws := &priorityWriteSchedulerRFC9218{
streams: make(map[uint32]streamMetadata),
}
return ws
}
func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
if ws.streams[streamID].location != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
q := ws.queuePool.get()
ws.streams[streamID] = streamMetadata{
location: q,
priority: opt.priority,
}
u, i := opt.priority.urgency, opt.priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
}
func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
return
}
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
delete(ws.streams, streamID)
ws.queuePool.put(q)
}
func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
return
}
// Remove stream from current location.
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
// Insert stream to the new queue.
u, i = priority.urgency, priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
}
func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
if wr.isControl() {
ws.control.push(wr)
return
}
q := ws.streams[wr.StreamID()].location
if q == nil {
// This is a closed stream.
// wr should not be a HEADERS or DATA frame.
// We push the request onto the control queue.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
ws.control.push(wr)
return
}
q.push(wr)
}
func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
// Control and RST_STREAM frames first.
if !ws.control.empty() {
return ws.control.shift(), true
}
// On the next Pop(), we want to prioritize incremental if we prioritized
// non-incremental request of the same urgency this time. Vice-versa.
// i.e. when there are incremental and non-incremental requests at the same
// priority, we give 50% of our bandwidth to the incremental ones in
// aggregate and 50% to the first non-incremental one (since
// non-incremental streams do not use round-robin writes).
ws.prioritizeIncremental = !ws.prioritizeIncremental
// Always prioritize lowest u (i.e. highest urgency level).
for u := range ws.heads {
for i := range ws.heads[u] {
// When we want to prioritize incremental, we try to pop i=true
// first before i=false when u is the same.
if ws.prioritizeIncremental {
i = (i + 1) % 2
}
q := ws.heads[u][i]
if q == nil {
continue
}
for {
if wr, ok := q.consume(math.MaxInt32); ok {
if i == 1 {
// For incremental streams, we update head to q.next so
// we can round-robin between multiple streams that can
// immediately benefit from partial writes.
ws.heads[u][i] = q.next
} else {
// For non-incremental streams, we try to finish one to
// completion rather than doing round-robin. However,
// we update head here so that if q.consume() is !ok
// (e.g. the stream has no more frame to consume), head
// is updated to the next q that has frames to consume
// on future iterations. This way, we do not prioritize
// writing to unavailable stream on next Pop() calls,
// preventing head-of-line blocking.
ws.heads[u][i] = q
}
return wr, true
}
q = q.next
if q == ws.heads[u][i] {
break
}
}
}
}
return FrameWriteRequest{}, false
}

View File

@@ -0,0 +1,237 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"reflect"
"testing"
)
func TestPrioritySchedulerUrgency(t *testing.T) {
const maxFrameSize = 16
sc := &serverConn{maxFrameSize: maxFrameSize}
ws := newPriorityWriteSchedulerRFC9128()
streams := make([]*stream, 5)
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 20) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{
priority: PriorityParam{
urgency: 7,
incremental: 0,
},
})
wr := FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: make([]byte, maxFrameSize*(i+1)),
endStream: false,
},
stream: streams[i],
}
ws.Push(wr)
}
// Raise the urgency of all even-numbered streams.
for i := range streams {
streamID := uint32(i) + 1
if streamID%2 == 1 {
continue
}
ws.AdjustStream(streamID, PriorityParam{
urgency: 0,
incremental: 0,
})
}
const controlFrames = 2
for range controlFrames {
ws.Push(makeWriteNonStreamRequest())
}
// We should get the control frames first.
for range controlFrames {
wr, ok := ws.Pop()
if !ok || wr.StreamID() != 0 {
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
}
}
// Each stream should write maxFrameSize bytes until it runs out of data.
// Higher-urgency even-numbered streams should come first.
want := []uint32{2, 2, 4, 4, 4, 4, 1, 3, 3, 3, 5, 5, 5, 5, 5}
var got []uint32
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
got = append(got, wr.StreamID())
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}
func TestPrioritySchedulerIncremental(t *testing.T) {
const maxFrameSize = 16
sc := &serverConn{maxFrameSize: maxFrameSize}
ws := newPriorityWriteSchedulerRFC9128()
streams := make([]*stream, 5)
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 20) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{
priority: PriorityParam{
urgency: 7,
incremental: 0,
},
})
wr := FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: make([]byte, maxFrameSize*(i+1)),
endStream: false,
},
stream: streams[i],
}
ws.Push(wr)
}
// Make even-numbered streams incremental.
for i := range streams {
streamID := uint32(i) + 1
if streamID%2 == 1 {
continue
}
ws.AdjustStream(streamID, PriorityParam{
urgency: 7,
incremental: 1,
})
}
const controlFrames = 2
for range controlFrames {
ws.Push(makeWriteNonStreamRequest())
}
// We should get the control frames first.
for range controlFrames {
wr, ok := ws.Pop()
if !ok || wr.StreamID() != 0 {
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
}
}
// Each stream should write maxFrameSize bytes until it runs out of data.
// We should:
// - Round-robin between even and odd-numbered streams as they have
// different i but the same u.
// - Amongst even-numbered streams, round-robin writes as they are
// incremental.
// - Among odd-numbered streams, do not round-robin as they are
// non-incremental.
want := []uint32{2, 1, 4, 3, 2, 3, 4, 3, 4, 5, 4, 5, 5, 5, 5}
var got []uint32
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
got = append(got, wr.StreamID())
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}
func TestPrioritySchedulerUrgencyAndIncremental(t *testing.T) {
const maxFrameSize = 16
sc := &serverConn{maxFrameSize: maxFrameSize}
ws := newPriorityWriteSchedulerRFC9128()
streams := make([]*stream, 6)
for i := range streams {
streamID := uint32(i) + 1
streams[i] = &stream{
id: streamID,
sc: sc,
}
streams[i].flow.add(1 << 20) // arbitrary large value
ws.OpenStream(streamID, OpenStreamOptions{
priority: PriorityParam{
urgency: 7,
incremental: 0,
},
})
wr := FrameWriteRequest{
write: &writeData{
streamID: streamID,
p: make([]byte, maxFrameSize*(i+1)),
endStream: false,
},
stream: streams[i],
}
ws.Push(wr)
}
// Make even-numbered streams incremental and of higher urgency.
for i := range streams {
streamID := uint32(i) + 1
if streamID%2 == 1 {
continue
}
ws.AdjustStream(streamID, PriorityParam{
urgency: 0,
incremental: 1,
})
}
// Close stream 1 and 4
ws.CloseStream(1)
ws.CloseStream(4)
const controlFrames = 2
for range controlFrames {
ws.Push(makeWriteNonStreamRequest())
}
// We should get the control frames first.
for range controlFrames {
wr, ok := ws.Pop()
if !ok || wr.StreamID() != 0 {
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
}
}
// Each stream should write maxFrameSize bytes until it runs out of data.
// We should:
// - Get even-numbered streams first that are written in a round-robin
// manner as they have higher urgency and are incremental.
// - Get odd-numbered streams after that are written one-by-one to
// completion as they are of lower urgency and are not incremental.
// - Skip stream 1 and 4 that have been closed.
want := []uint32{2, 6, 2, 6, 6, 6, 6, 6, 3, 3, 3, 5, 5, 5, 5, 5}
var got []uint32
for {
wr, ok := ws.Pop()
if !ok {
break
}
if wr.DataSize() != maxFrameSize {
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
}
got = append(got, wr.StreamID())
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("popped streams %v, want %v", got, want)
}
}