mirror of
https://github.com/golang/net.git
synced 2026-03-31 02:17:08 +09:00
Per RFC 9218, servers should buffer the most recently received PRIORITY_UPDATE frame. This CL implements said buffering within the RFC 9218 priority write scheduler. For golang/go#75500 Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607 Reviewed-on: https://go-review.googlesource.com/c/net/+/728401 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com> Reviewed-by: Nicholas Husin <husin@google.com>
225 lines
6.3 KiB
Go
225 lines
6.3 KiB
Go
// Copyright 2025 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package http2
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
)
|
|
|
|
type streamMetadata struct {
|
|
location *writeQueue
|
|
priority PriorityParam
|
|
}
|
|
|
|
type priorityWriteSchedulerRFC9218 struct {
|
|
// control contains control frames (SETTINGS, PING, etc.).
|
|
control writeQueue
|
|
|
|
// heads contain the head of a circular list of streams.
|
|
// We put these heads within a nested array that represents urgency and
|
|
// incremental, as defined in
|
|
// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
|
|
// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
|
|
heads [8][2]*writeQueue
|
|
|
|
// streams contains a mapping between each stream ID and their metadata, so
|
|
// we can quickly locate them when needing to, for example, adjust their
|
|
// priority.
|
|
streams map[uint32]streamMetadata
|
|
|
|
// queuePool are empty queues for reuse.
|
|
queuePool writeQueuePool
|
|
|
|
// prioritizeIncremental is used to determine whether we should prioritize
|
|
// incremental streams or not, when urgency is the same in a given Pop()
|
|
// call.
|
|
prioritizeIncremental bool
|
|
|
|
// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
|
|
// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
|
|
priorityUpdateBuf struct {
|
|
// streamID being 0 means that the buffer is empty. This is a safe
|
|
// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
|
|
streamID uint32
|
|
priority PriorityParam
|
|
}
|
|
}
|
|
|
|
func newPriorityWriteSchedulerRFC9218() WriteScheduler {
|
|
ws := &priorityWriteSchedulerRFC9218{
|
|
streams: make(map[uint32]streamMetadata),
|
|
}
|
|
return ws
|
|
}
|
|
|
|
func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
|
|
if ws.streams[streamID].location != nil {
|
|
panic(fmt.Errorf("stream %d already opened", streamID))
|
|
}
|
|
if streamID == ws.priorityUpdateBuf.streamID {
|
|
ws.priorityUpdateBuf.streamID = 0
|
|
opt.priority = ws.priorityUpdateBuf.priority
|
|
}
|
|
q := ws.queuePool.get()
|
|
ws.streams[streamID] = streamMetadata{
|
|
location: q,
|
|
priority: opt.priority,
|
|
}
|
|
|
|
u, i := opt.priority.urgency, opt.priority.incremental
|
|
if ws.heads[u][i] == nil {
|
|
ws.heads[u][i] = q
|
|
q.next = q
|
|
q.prev = q
|
|
} else {
|
|
// Queues are stored in a ring.
|
|
// Insert the new stream before ws.head, putting it at the end of the list.
|
|
q.prev = ws.heads[u][i].prev
|
|
q.next = ws.heads[u][i]
|
|
q.prev.next = q
|
|
q.next.prev = q
|
|
}
|
|
}
|
|
|
|
func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
|
|
metadata := ws.streams[streamID]
|
|
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
|
|
if q == nil {
|
|
return
|
|
}
|
|
if q.next == q {
|
|
// This was the only open stream.
|
|
ws.heads[u][i] = nil
|
|
} else {
|
|
q.prev.next = q.next
|
|
q.next.prev = q.prev
|
|
if ws.heads[u][i] == q {
|
|
ws.heads[u][i] = q.next
|
|
}
|
|
}
|
|
delete(ws.streams, streamID)
|
|
ws.queuePool.put(q)
|
|
}
|
|
|
|
func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
|
|
metadata := ws.streams[streamID]
|
|
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
|
|
if q == nil {
|
|
ws.priorityUpdateBuf.streamID = streamID
|
|
ws.priorityUpdateBuf.priority = priority
|
|
return
|
|
}
|
|
|
|
// Remove stream from current location.
|
|
if q.next == q {
|
|
// This was the only open stream.
|
|
ws.heads[u][i] = nil
|
|
} else {
|
|
q.prev.next = q.next
|
|
q.next.prev = q.prev
|
|
if ws.heads[u][i] == q {
|
|
ws.heads[u][i] = q.next
|
|
}
|
|
}
|
|
|
|
// Insert stream to the new queue.
|
|
u, i = priority.urgency, priority.incremental
|
|
if ws.heads[u][i] == nil {
|
|
ws.heads[u][i] = q
|
|
q.next = q
|
|
q.prev = q
|
|
} else {
|
|
// Queues are stored in a ring.
|
|
// Insert the new stream before ws.head, putting it at the end of the list.
|
|
q.prev = ws.heads[u][i].prev
|
|
q.next = ws.heads[u][i]
|
|
q.prev.next = q
|
|
q.next.prev = q
|
|
}
|
|
|
|
// Update the metadata.
|
|
ws.streams[streamID] = streamMetadata{
|
|
location: q,
|
|
priority: priority,
|
|
}
|
|
}
|
|
|
|
func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
|
|
if wr.isControl() {
|
|
ws.control.push(wr)
|
|
return
|
|
}
|
|
q := ws.streams[wr.StreamID()].location
|
|
if q == nil {
|
|
// This is a closed stream.
|
|
// wr should not be a HEADERS or DATA frame.
|
|
// We push the request onto the control queue.
|
|
if wr.DataSize() > 0 {
|
|
panic("add DATA on non-open stream")
|
|
}
|
|
ws.control.push(wr)
|
|
return
|
|
}
|
|
q.push(wr)
|
|
}
|
|
|
|
func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
|
|
// Control and RST_STREAM frames first.
|
|
if !ws.control.empty() {
|
|
return ws.control.shift(), true
|
|
}
|
|
|
|
// On the next Pop(), we want to prioritize incremental if we prioritized
|
|
// non-incremental request of the same urgency this time. Vice-versa.
|
|
// i.e. when there are incremental and non-incremental requests at the same
|
|
// priority, we give 50% of our bandwidth to the incremental ones in
|
|
// aggregate and 50% to the first non-incremental one (since
|
|
// non-incremental streams do not use round-robin writes).
|
|
ws.prioritizeIncremental = !ws.prioritizeIncremental
|
|
|
|
// Always prioritize lowest u (i.e. highest urgency level).
|
|
for u := range ws.heads {
|
|
for i := range ws.heads[u] {
|
|
// When we want to prioritize incremental, we try to pop i=true
|
|
// first before i=false when u is the same.
|
|
if ws.prioritizeIncremental {
|
|
i = (i + 1) % 2
|
|
}
|
|
q := ws.heads[u][i]
|
|
if q == nil {
|
|
continue
|
|
}
|
|
for {
|
|
if wr, ok := q.consume(math.MaxInt32); ok {
|
|
if i == 1 {
|
|
// For incremental streams, we update head to q.next so
|
|
// we can round-robin between multiple streams that can
|
|
// immediately benefit from partial writes.
|
|
ws.heads[u][i] = q.next
|
|
} else {
|
|
// For non-incremental streams, we try to finish one to
|
|
// completion rather than doing round-robin. However,
|
|
// we update head here so that if q.consume() is !ok
|
|
// (e.g. the stream has no more frame to consume), head
|
|
// is updated to the next q that has frames to consume
|
|
// on future iterations. This way, we do not prioritize
|
|
// writing to unavailable stream on next Pop() calls,
|
|
// preventing head-of-line blocking.
|
|
ws.heads[u][i] = q
|
|
}
|
|
return wr, true
|
|
}
|
|
q = q.next
|
|
if q == ws.heads[u][i] {
|
|
break
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
return FrameWriteRequest{}, false
|
|
}
|