http2: add initial support for PRIORITY_UPDATE frame defined in RFC 9218

This change adds initial support for the PRIORITY_UPDATE frame
introduced in RFC 9218.

Clients can now use a new exported function to write PRIORITY_UPDATE
frames easily. However, sending PRIORITY_UPDATE frames to the server
does not currently cause any behavior changes: we only use
PRIORITY_UPDATE frames to adjust stream priority when the RFC 9218 write
scheduler is being used for a particular connection. However, this
scheduler is not currently usable yet from any configuration surfaces
exposed to the user.

For golang/go#75500

Change-Id: Ie2c821cb0d2faa6e942e209e11638f190fc98e2b
Reviewed-on: https://go-review.googlesource.com/c/net/+/705917
Reviewed-by: Nicholas Husin <husin@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
This commit is contained in:
Nicholas S. Husin
2025-10-06 16:20:23 -04:00
committed by Nicholas Husin
parent 15b99c1fb8
commit f40205b5b5
5 changed files with 354 additions and 33 deletions

View File

@@ -404,6 +404,12 @@ func (tf *testConnFramer) writePriority(id uint32, p PriorityParam) {
}
}
func (tf *testConnFramer) writePriorityUpdate(id uint32, p string) {
if err := tf.fr.WritePriorityUpdate(id, p); err != nil {
tf.t.Fatal(err)
}
}
func (tf *testConnFramer) writeRSTStream(streamID uint32, code ErrCode) {
tf.t.Helper()
if err := tf.fr.WriteRSTStream(streamID, code); err != nil {

View File

@@ -16,6 +16,7 @@ import (
"golang.org/x/net/http/httpguts"
"golang.org/x/net/http2/hpack"
"golang.org/x/net/internal/httpsfv"
)
const frameHeaderLen = 9
@@ -23,33 +24,36 @@ const frameHeaderLen = 9
var padZeros = make([]byte, 255) // zeros for padding
// A FrameType is a registered frame type as defined in
// https://httpwg.org/specs/rfc7540.html#rfc.section.11.2
// https://httpwg.org/specs/rfc7540.html#rfc.section.11.2 and other future
// RFCs.
type FrameType uint8
const (
FrameData FrameType = 0x0
FrameHeaders FrameType = 0x1
FramePriority FrameType = 0x2
FrameRSTStream FrameType = 0x3
FrameSettings FrameType = 0x4
FramePushPromise FrameType = 0x5
FramePing FrameType = 0x6
FrameGoAway FrameType = 0x7
FrameWindowUpdate FrameType = 0x8
FrameContinuation FrameType = 0x9
FrameData FrameType = 0x0
FrameHeaders FrameType = 0x1
FramePriority FrameType = 0x2
FrameRSTStream FrameType = 0x3
FrameSettings FrameType = 0x4
FramePushPromise FrameType = 0x5
FramePing FrameType = 0x6
FrameGoAway FrameType = 0x7
FrameWindowUpdate FrameType = 0x8
FrameContinuation FrameType = 0x9
FramePriorityUpdate FrameType = 0x10
)
var frameNames = [...]string{
FrameData: "DATA",
FrameHeaders: "HEADERS",
FramePriority: "PRIORITY",
FrameRSTStream: "RST_STREAM",
FrameSettings: "SETTINGS",
FramePushPromise: "PUSH_PROMISE",
FramePing: "PING",
FrameGoAway: "GOAWAY",
FrameWindowUpdate: "WINDOW_UPDATE",
FrameContinuation: "CONTINUATION",
FrameData: "DATA",
FrameHeaders: "HEADERS",
FramePriority: "PRIORITY",
FrameRSTStream: "RST_STREAM",
FrameSettings: "SETTINGS",
FramePushPromise: "PUSH_PROMISE",
FramePing: "PING",
FrameGoAway: "GOAWAY",
FrameWindowUpdate: "WINDOW_UPDATE",
FrameContinuation: "CONTINUATION",
FramePriorityUpdate: "PRIORITY_UPDATE",
}
func (t FrameType) String() string {
@@ -125,16 +129,17 @@ var flagName = map[FrameType]map[Flags]string{
type frameParser func(fc *frameCache, fh FrameHeader, countError func(string), payload []byte) (Frame, error)
var frameParsers = [...]frameParser{
FrameData: parseDataFrame,
FrameHeaders: parseHeadersFrame,
FramePriority: parsePriorityFrame,
FrameRSTStream: parseRSTStreamFrame,
FrameSettings: parseSettingsFrame,
FramePushPromise: parsePushPromise,
FramePing: parsePingFrame,
FrameGoAway: parseGoAwayFrame,
FrameWindowUpdate: parseWindowUpdateFrame,
FrameContinuation: parseContinuationFrame,
FrameData: parseDataFrame,
FrameHeaders: parseHeadersFrame,
FramePriority: parsePriorityFrame,
FrameRSTStream: parseRSTStreamFrame,
FrameSettings: parseSettingsFrame,
FramePushPromise: parsePushPromise,
FramePing: parsePingFrame,
FrameGoAway: parseGoAwayFrame,
FrameWindowUpdate: parseWindowUpdateFrame,
FrameContinuation: parseContinuationFrame,
FramePriorityUpdate: parsePriorityUpdateFrame,
}
func typeFrameParser(t FrameType) frameParser {
@@ -1266,6 +1271,74 @@ func (f *Framer) WritePriority(streamID uint32, p PriorityParam) error {
return f.endWrite()
}
// PriorityUpdateFrame is a PRIORITY_UPDATE frame as described in
// https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
type PriorityUpdateFrame struct {
FrameHeader
Priority string
PrioritizedStreamID uint32
}
func parseRFC9218Priority(s string) (p PriorityParam, ok bool) {
p = defaultRFC9218Priority
ok = httpsfv.ParseDictionary(s, func(key, val, _ string) {
switch key {
case "u":
if u, ok := httpsfv.ParseInteger(val); ok && u >= 0 && u <= 7 {
p.urgency = uint8(u)
}
case "i":
if i, ok := httpsfv.ParseBoolean(val); ok {
if i {
p.incremental = 1
} else {
p.incremental = 0
}
}
}
})
if !ok {
return defaultRFC9218Priority, ok
}
return p, true
}
func parsePriorityUpdateFrame(_ *frameCache, fh FrameHeader, countError func(string), payload []byte) (Frame, error) {
if fh.StreamID != 0 {
countError("frame_priority_update_non_zero_stream")
return nil, connError{ErrCodeProtocol, "PRIORITY_UPDATE frame with non-zero stream ID"}
}
if len(payload) < 4 {
countError("frame_priority_update_bad_length")
return nil, connError{ErrCodeFrameSize, fmt.Sprintf("PRIORITY_UPDATE frame payload size was %d; want at least 4", len(payload))}
}
v := binary.BigEndian.Uint32(payload[:4])
streamID := v & 0x7fffffff // mask off high bit
if streamID == 0 {
countError("frame_priority_update_prioritizing_zero_stream")
return nil, connError{ErrCodeProtocol, "PRIORITY_UPDATE frame with prioritized stream ID of zero"}
}
return &PriorityUpdateFrame{
FrameHeader: fh,
PrioritizedStreamID: streamID,
Priority: string(payload[4:]),
}, nil
}
// WritePriorityUpdate writes a PRIORITY_UPDATE frame.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently.
func (f *Framer) WritePriorityUpdate(streamID uint32, priority string) error {
if !validStreamID(streamID) && !f.AllowIllegalWrites {
return errStreamID
}
f.startWrite(FramePriorityUpdate, 0, 0)
f.writeUint32(streamID)
f.writeBytes([]byte(priority))
return f.endWrite()
}
// A RSTStreamFrame allows for abnormal termination of a stream.
// See https://httpwg.org/specs/rfc7540.html#rfc.section.6.4
type RSTStreamFrame struct {

View File

@@ -38,7 +38,7 @@ func TestFrameTypeString(t *testing.T) {
{FrameData, "DATA"},
{FramePing, "PING"},
{FrameGoAway, "GOAWAY"},
{0xf, "UNKNOWN_FRAME_TYPE_15"},
{0x20, "UNKNOWN_FRAME_TYPE_32"},
}
for i, tt := range tests {
@@ -427,6 +427,99 @@ func TestWriteContinuation(t *testing.T) {
}
}
func TestParseRFC9218Priority(t *testing.T) {
tests := []struct {
name string
priorityStr string
want PriorityParam
wantOk bool
}{
{
name: "with urgency",
priorityStr: "u=0",
want: PriorityParam{
urgency: 0,
incremental: defaultRFC9218Priority.incremental,
},
wantOk: true,
},
{
name: "with implicit incremental",
priorityStr: "i",
want: PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 1,
},
wantOk: true,
},
{
name: "with explicit incremental",
priorityStr: "i=?1",
want: PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 1,
},
wantOk: true,
},
{
name: "with urgency and incremental",
priorityStr: "i=?0, u=4",
want: PriorityParam{
urgency: 4,
incremental: 0,
},
wantOk: true,
},
{
name: "with other valid dictionary data",
priorityStr: "some=data;someparam;u=fake, u=1;foo, i;bar",
want: PriorityParam{
urgency: 1,
incremental: 1,
},
wantOk: true,
},
{
name: "repeated field",
priorityStr: "u=1,i,u=5,i=?0",
want: PriorityParam{
urgency: 5,
incremental: 0,
},
wantOk: true,
},
{
name: "wrong field type",
priorityStr: `u="urgency will be ignored", i`,
want: PriorityParam{
urgency: defaultRFC9218Priority.urgency,
incremental: 1,
},
wantOk: true,
},
{
name: "invalid dictionary",
priorityStr: `u=1,i, but this is not a valid dictionary"`,
want: defaultRFC9218Priority,
},
{
name: "out of range value",
priorityStr: "u=8",
want: defaultRFC9218Priority,
wantOk: true,
},
}
for _, tt := range tests {
got, gotOk := parseRFC9218Priority(tt.priorityStr)
if gotOk != tt.wantOk {
t.Errorf("test %q: mismatch.\n got ok: %#v\nwant ok: %#v\n", tt.name, got, tt.want)
}
if got != tt.want {
t.Errorf("test %q: mismatch.\n got: %#v\nwant: %#v\n", tt.name, got, tt.want)
}
}
}
func TestWritePriority(t *testing.T) {
const streamID = 42
tests := []struct {
@@ -495,6 +588,115 @@ func TestWritePriority(t *testing.T) {
}
}
func TestWritePriorityUpdate(t *testing.T) {
const streamID = 42
tests := []struct {
name string
priority string
wantFrame *PriorityUpdateFrame
}{
{
name: "with urgency",
priority: "u=0",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 7,
},
Priority: "u=0",
PrioritizedStreamID: streamID,
},
},
{
name: "with incremental",
priority: "i",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 5,
},
Priority: "i",
PrioritizedStreamID: streamID,
},
},
{
name: "with urgency and incremental",
priority: "u=7,i",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 9,
},
Priority: "u=7,i",
PrioritizedStreamID: streamID,
},
},
{
name: "with other fields",
priority: "a=123,u=7,i,b;a;b",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 21,
},
Priority: "a=123,u=7,i,b;a;b",
PrioritizedStreamID: streamID,
},
},
{
name: "with string escapes",
priority: "u=\"invalid\" , i",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 19,
},
Priority: "u=\"invalid\" , i",
PrioritizedStreamID: streamID,
},
},
{
name: "with empty payload",
priority: "",
wantFrame: &PriorityUpdateFrame{
FrameHeader: FrameHeader{
valid: true,
StreamID: 0,
Type: FramePriorityUpdate,
Length: 4,
},
Priority: "",
PrioritizedStreamID: streamID,
},
},
}
for _, tt := range tests {
fr, _ := testFramer()
if err := fr.WritePriorityUpdate(streamID, tt.priority); err != nil {
t.Errorf("test %q: %v", tt.name, err)
continue
}
f, err := fr.ReadFrame()
if err != nil {
t.Errorf("test %q: failed to read the frame back: %v", tt.name, err)
continue
}
if !reflect.DeepEqual(f, tt.wantFrame) {
t.Errorf("test %q: mismatch.\n got: %#v\nwant: %#v\n", tt.name, f, tt.wantFrame)
}
}
}
func TestWriteSettings(t *testing.T) {
fr, buf := testFramer()
settings := []Setting{{1, 2}, {3, 4}}
@@ -1268,7 +1470,7 @@ func TestTypeFrameParser(t *testing.T) {
}
// typeFrameParser() for an unknown type returns a function that returns UnknownFrame
unknownFrameType := FrameType(FrameContinuation + 1)
unknownFrameType := FrameType(FramePriorityUpdate + 1)
unknownParser := typeFrameParser(unknownFrameType)
frame, err := unknownParser(nil, FrameHeader{}, nil, nil)
if err != nil {

View File

@@ -1623,6 +1623,8 @@ func (sc *serverConn) processFrame(f Frame) error {
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return sc.countError("push_promise", ConnectionError(ErrCodeProtocol))
case *PriorityUpdateFrame:
return sc.processPriorityUpdate(f)
default:
sc.vlogf("http2: server ignoring frame: %v", f.Header())
return nil
@@ -2189,6 +2191,18 @@ func (sc *serverConn) processPriority(f *PriorityFrame) error {
return nil
}
func (sc *serverConn) processPriorityUpdate(f *PriorityUpdateFrame) error {
if _, ok := sc.writeSched.(*priorityWriteSchedulerRFC9218); !ok {
return nil
}
p, ok := parseRFC9218Priority(f.Priority)
if !ok {
return sc.countError("unparsable_priority_update", streamError(f.PrioritizedStreamID, ErrCodeProtocol))
}
sc.writeSched.AdjustStream(f.PrioritizedStreamID, p)
return nil
}
func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
sc.serveG.check()
if id == 0 {

View File

@@ -1765,6 +1765,32 @@ func testServer_Rejects_Priority0(t testing.TB) {
st.wantGoAway(0, ErrCodeProtocol)
}
// PRIORITY_UPDATE only accepts non-zero ID for the prioritized stream ID in
// its payload.
func TestServer_Rejects_PriorityUpdate0(t *testing.T) {
synctestTest(t, testServer_Rejects_PriorityUpdate0)
}
func testServer_Rejects_PriorityUpdate0(t testing.TB) {
st := newServerTesterForError(t)
st.fr.AllowIllegalWrites = true
st.writePriorityUpdate(0, "")
st.wantGoAway(0, ErrCodeProtocol)
}
// PRIORITY_UPDATE with unparsable priority parameters may be rejected.
func TestServer_Rejects_PriorityUpdateUnparsable(t *testing.T) {
synctestTest(t, testServer_Rejects_PriorityUnparsable)
}
func testServer_Rejects_PriorityUnparsable(t testing.TB) {
st := newServerTester(t, nil, func(s *Server) {
s.NewWriteScheduler = newPriorityWriteSchedulerRFC9218
})
defer st.Close()
st.greet()
st.writePriorityUpdate(1, "Invalid dictionary: ((((")
st.wantRSTStream(1, ErrCodeProtocol)
}
// No HEADERS frame with a self-dependence.
func TestServer_Rejects_HeadersSelfDependence(t *testing.T) {
synctestTest(t, testServer_Rejects_HeadersSelfDependence)