net/http/internal/http2: remove inaccessible write schedulers

RFC 7540 and random write schedulers are not accessible from std, and
are essentially dead code at this point.

Change-Id: Ib1e9e6e9ae7962b451bc36c0dad3c503f56dc1c0
Reviewed-on: https://go-review.googlesource.com/c/go/+/757380
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <husin@google.com>
Auto-Submit: Nicholas Husin <nsh@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Nicholas S. Husin
2026-03-20 10:51:20 -04:00
committed by Gopher Robot
parent 9d5d6af2d5
commit 918644a215
7 changed files with 0 additions and 1247 deletions

View File

@@ -203,10 +203,6 @@ func EncodeHeaderRaw(t testing.TB, headers ...string) []byte {
return encodeHeaderRaw(t, headers...)
}
func NewPriorityWriteSchedulerRFC7540(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
return newPriorityWriteSchedulerRFC7540(cfg)
}
func NewPriorityWriteSchedulerRFC9218() WriteScheduler {
return newPriorityWriteSchedulerRFC9218()
}

View File

@@ -573,8 +573,6 @@ func (sc *serverConn) writeSchedIgnoresRFC7540() bool {
switch sc.writeSched.(type) {
case *priorityWriteSchedulerRFC9218:
return true
case *randomWriteScheduler:
return true
case *roundRobinWriteScheduler:
return true
default:

View File

@@ -131,26 +131,6 @@ func BenchmarkWriteSchedulerLifetimeRoundRobin(b *testing.B) {
benchmarkStreamLifetime(b, newRoundRobinWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputRandom(b *testing.B) {
benchmarkThroughput(b, NewRandomWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerLifetimeRandom(b *testing.B) {
benchmarkStreamLifetime(b, NewRandomWriteScheduler, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputPriorityRFC7540(b *testing.B) {
benchmarkThroughput(b, func() WriteScheduler { return NewPriorityWriteScheduler(nil) }, PriorityParam{})
}
func BenchmarkWriteSchedulerLifetimePriorityRFC7540(b *testing.B) {
// RFC7540 priority scheduler does not always succeed in closing the
// stream, causing this benchmark to panic due to opening an already open
// stream.
b.SkipNow()
benchmarkStreamLifetime(b, func() WriteScheduler { return NewPriorityWriteScheduler(nil) }, PriorityParam{})
}
func BenchmarkWriteSchedulerThroughputPriorityRFC9218Incremental(b *testing.B) {
benchmarkThroughput(b, newPriorityWriteSchedulerRFC9218, PriorityParam{
incremental: 1,

View File

@@ -1,449 +0,0 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"cmp"
"fmt"
"math"
"slices"
)
// RFC 7540, Section 5.3.5: the default weight is 16.
const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type PriorityWriteSchedulerConfig struct {
// MaxClosedNodesInTree controls the maximum number of closed streams to
// retain in the priority tree. Setting this to zero saves a small amount
// of memory at the cost of performance.
//
// See RFC 7540, Section 5.3.4:
// "It is possible for a stream to become closed while prioritization
// information ... is in transit. ... This potentially creates suboptimal
// prioritization, since the stream could be given a priority that is
// different from what is intended. To avoid these problems, an endpoint
// SHOULD retain stream prioritization state for a period after streams
// become closed. The longer state is retained, the lower the chance that
// streams are assigned incorrect or default priority values."
MaxClosedNodesInTree int
// MaxIdleNodesInTree controls the maximum number of idle streams to
// retain in the priority tree. Setting this to zero saves a small amount
// of memory at the cost of performance.
//
// See RFC 7540, Section 5.3.4:
// Similarly, streams that are in the "idle" state can be assigned
// priority or become a parent of other streams. This allows for the
// creation of a grouping node in the dependency tree, which enables
// more flexible expressions of priority. Idle streams begin with a
// default priority (Section 5.3.5).
MaxIdleNodesInTree int
// ThrottleOutOfOrderWrites enables write throttling to help ensure that
// data is delivered in priority order. This works around a race where
// stream B depends on stream A and both streams are about to call Write
// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
// write as much data from B as possible, but this is suboptimal because A
// is a higher-priority stream. With throttling enabled, we write a small
// amount of data from B to minimize the amount of bandwidth that B can
// steal from A.
ThrottleOutOfOrderWrites bool
}
// NewPriorityWriteScheduler constructs a WriteScheduler that schedules
// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
// If cfg is nil, default options are used.
func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
return newPriorityWriteSchedulerRFC7540(cfg)
}
func newPriorityWriteSchedulerRFC7540(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
if cfg == nil {
// For justification of these defaults, see:
// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
cfg = &PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 10,
MaxIdleNodesInTree: 10,
ThrottleOutOfOrderWrites: false,
}
}
ws := &priorityWriteSchedulerRFC7540{
nodes: make(map[uint32]*priorityNodeRFC7540),
maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
}
ws.nodes[0] = &ws.root
if cfg.ThrottleOutOfOrderWrites {
ws.writeThrottleLimit = 1024
} else {
ws.writeThrottleLimit = math.MaxInt32
}
return ws
}
type priorityNodeStateRFC7540 int
const (
priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
priorityNodeClosedRFC7540
priorityNodeIdleRFC7540
)
// priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
// Each node is associated with a single stream ID.
// See RFC 7540, Section 5.3.
type priorityNodeRFC7540 struct {
q writeQueue // queue of pending frames to write
id uint32 // id of the stream, or 0 for the root of the tree
weight uint8 // the actual weight is weight+1, so the value is in [1,256]
state priorityNodeStateRFC7540 // open | closed | idle
bytes int64 // number of bytes written by this node, or 0 if closed
subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
// These links form the priority tree.
parent *priorityNodeRFC7540
kids *priorityNodeRFC7540 // start of the kids list
prev, next *priorityNodeRFC7540 // doubly-linked list of siblings
}
func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
if n == parent {
panic("setParent to self")
}
if n.parent == parent {
return
}
// Unlink from current parent.
if parent := n.parent; parent != nil {
if n.prev == nil {
parent.kids = n.next
} else {
n.prev.next = n.next
}
if n.next != nil {
n.next.prev = n.prev
}
}
// Link to new parent.
// If parent=nil, remove n from the tree.
// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
n.parent = parent
if parent == nil {
n.next = nil
n.prev = nil
} else {
n.next = parent.kids
n.prev = nil
if n.next != nil {
n.next.prev = n
}
parent.kids = n
}
}
func (n *priorityNodeRFC7540) addBytes(b int64) {
n.bytes += b
for ; n != nil; n = n.parent {
n.subtreeBytes += b
}
}
// walkReadyInOrder iterates over the tree in priority order, calling f for each node
// with a non-empty write queue. When f returns true, this function returns true and the
// walk halts. tmp is used as scratch space for sorting.
//
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
// if any ancestor p of n is still open (ignoring the root node).
func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
if !n.q.empty() && f(n, openParent) {
return true
}
if n.kids == nil {
return false
}
// Don't consider the root "open" when updating openParent since
// we can't send data frames on the root stream (only control frames).
if n.id != 0 {
openParent = openParent || (n.state == priorityNodeOpenRFC7540)
}
// Common case: only one kid or all kids have the same weight.
// Some clients don't use weights; other clients (like web browsers)
// use mostly-linear priority trees.
w := n.kids.weight
needSort := false
for k := n.kids.next; k != nil; k = k.next {
if k.weight != w {
needSort = true
break
}
}
if !needSort {
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
return true
}
}
return false
}
// Uncommon case: sort the child nodes. We remove the kids from the parent,
// then re-insert after sorting so we can reuse tmp for future sort calls.
*tmp = (*tmp)[:0]
for n.kids != nil {
*tmp = append(*tmp, n.kids)
n.kids.setParent(nil)
}
slices.SortFunc(*tmp, func(i, k *priorityNodeRFC7540) int {
// Prefer the subtree that has sent fewer bytes relative to its weight.
// See sections 5.3.2 and 5.3.4.
wi, bi := float64(i.weight)+1, float64(i.subtreeBytes)
wk, bk := float64(k.weight)+1, float64(k.subtreeBytes)
if bi == 0 && bk == 0 {
return cmp.Compare(wk, wi)
}
if bk == 0 {
return 0
}
return cmp.Compare(bi/bk, wi/wk)
})
for i := len(*tmp) - 1; i >= 0; i-- {
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
return true
}
}
return false
}
type priorityWriteSchedulerRFC7540 struct {
// root is the root of the priority tree, where root.id = 0.
// The root queues control frames that are not associated with any stream.
root priorityNodeRFC7540
// nodes maps stream ids to priority tree nodes.
nodes map[uint32]*priorityNodeRFC7540
// maxID is the maximum stream id in nodes.
maxID uint32
// lists of nodes that have been closed or are idle, but are kept in
// the tree for improved prioritization. When the lengths exceed either
// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
closedNodes, idleNodes []*priorityNodeRFC7540
// From the config.
maxClosedNodesInTree int
maxIdleNodesInTree int
writeThrottleLimit int32
enableWriteThrottle bool
// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
tmp []*priorityNodeRFC7540
// pool of empty queues for reuse.
queuePool writeQueuePool
}
func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
// The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
if curr.state != priorityNodeIdleRFC7540 {
panic(fmt.Sprintf("stream %d already opened", streamID))
}
curr.state = priorityNodeOpenRFC7540
return
}
// RFC 7540, Section 5.3.5:
// "All streams are initially assigned a non-exclusive dependency on stream 0x0.
// Pushed streams initially depend on their associated stream. In both cases,
// streams are assigned a default weight of 16."
parent := ws.nodes[options.PusherID]
if parent == nil {
parent = &ws.root
}
n := &priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeightRFC7540,
state: priorityNodeOpenRFC7540,
}
n.setParent(parent)
ws.nodes[streamID] = n
if streamID > ws.maxID {
ws.maxID = streamID
}
}
func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
if streamID == 0 {
panic("violation of WriteScheduler interface: cannot close stream 0")
}
if ws.nodes[streamID] == nil {
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
}
if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
}
n := ws.nodes[streamID]
n.state = priorityNodeClosedRFC7540
n.addBytes(-n.bytes)
q := n.q
ws.queuePool.put(&q)
if ws.maxClosedNodesInTree > 0 {
ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
} else {
ws.removeNode(n)
}
}
func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
if streamID == 0 {
panic("adjustPriority on root")
}
// If streamID does not exist, there are two cases:
// - A closed stream that has been removed (this will have ID <= maxID)
// - An idle stream that is being used for "grouping" (this will have ID > maxID)
n := ws.nodes[streamID]
if n == nil {
if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
return
}
ws.maxID = streamID
n = &priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeightRFC7540,
state: priorityNodeIdleRFC7540,
}
n.setParent(&ws.root)
ws.nodes[streamID] = n
ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
}
// Section 5.3.1: A dependency on a stream that is not currently in the tree
// results in that stream being given a default priority (Section 5.3.5).
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
n.weight = priorityDefaultWeightRFC7540
return
}
// Ignore if the client tries to make a node its own parent.
if n == parent {
return
}
// Section 5.3.3:
// "If a stream is made dependent on one of its own dependencies, the
// formerly dependent stream is first moved to be dependent on the
// reprioritized stream's previous parent. The moved dependency retains
// its weight."
//
// That is: if parent depends on n, move parent to depend on n.parent.
for x := parent.parent; x != nil; x = x.parent {
if x == n {
parent.setParent(n.parent)
break
}
}
// Section 5.3.3: The exclusive flag causes the stream to become the sole
// dependency of its parent stream, causing other dependencies to become
// dependent on the exclusive stream.
if priority.Exclusive {
k := parent.kids
for k != nil {
next := k.next
if k != n {
k.setParent(n)
}
k = next
}
}
n.setParent(parent)
n.weight = priority.Weight
}
func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
var n *priorityNodeRFC7540
if wr.isControl() {
n = &ws.root
} else {
id := wr.StreamID()
n = ws.nodes[id]
if n == nil {
// id is an idle or closed stream. wr should not be a HEADERS or
// DATA frame. In other case, we push wr onto the root, rather
// than creating a new priorityNode.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
n = &ws.root
}
}
n.q.push(wr)
}
func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
limit := int32(math.MaxInt32)
if openParent {
limit = ws.writeThrottleLimit
}
wr, ok = n.q.consume(limit)
if !ok {
return false
}
n.addBytes(int64(wr.DataSize()))
// If B depends on A and B continuously has data available but A
// does not, gradually increase the throttling limit to allow B to
// steal more and more bandwidth from A.
if openParent {
ws.writeThrottleLimit += 1024
if ws.writeThrottleLimit < 0 {
ws.writeThrottleLimit = math.MaxInt32
}
} else if ws.enableWriteThrottle {
ws.writeThrottleLimit = 1024
}
return true
})
return wr, ok
}
func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
if maxSize == 0 {
return
}
if len(*list) == maxSize {
// Remove the oldest node, then shift left.
ws.removeNode((*list)[0])
x := (*list)[1:]
copy(*list, x)
*list = (*list)[:len(x)]
}
*list = append(*list, n)
}
func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
for n.kids != nil {
n.kids.setParent(n.parent)
}
n.setParent(nil)
delete(ws.nodes, n.id)
}

View File

@@ -1,631 +0,0 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"bytes"
"fmt"
"sort"
"testing"
)
func defaultPriorityWriteScheduler() *priorityWriteSchedulerRFC7540 {
return NewPriorityWriteScheduler(nil).(*priorityWriteSchedulerRFC7540)
}
func checkPriorityWellFormed(ws *priorityWriteSchedulerRFC7540) error {
for id, n := range ws.nodes {
if id != n.id {
return fmt.Errorf("bad ws.nodes: ws.nodes[%d] = %d", id, n.id)
}
if n.parent == nil {
if n.next != nil || n.prev != nil {
return fmt.Errorf("bad node %d: nil parent but prev/next not nil", id)
}
continue
}
found := false
for k := n.parent.kids; k != nil; k = k.next {
if k.id == id {
found = true
break
}
}
if !found {
return fmt.Errorf("bad node %d: not found in parent %d kids list", id, n.parent.id)
}
}
return nil
}
func fmtTree(ws *priorityWriteSchedulerRFC7540, fmtNode func(*priorityNodeRFC7540) string) string {
var ids []int
for _, n := range ws.nodes {
ids = append(ids, int(n.id))
}
sort.Ints(ids)
var buf bytes.Buffer
for _, id := range ids {
if buf.Len() != 0 {
buf.WriteString(" ")
}
if id == 0 {
buf.WriteString(fmtNode(&ws.root))
} else {
buf.WriteString(fmtNode(ws.nodes[uint32(id)]))
}
}
return buf.String()
}
func fmtNodeParentSkipRoot(n *priorityNodeRFC7540) string {
switch {
case n.id == 0:
return ""
case n.parent == nil:
return fmt.Sprintf("%d{parent:nil}", n.id)
default:
return fmt.Sprintf("%d{parent:%d}", n.id, n.parent.id)
}
}
func fmtNodeWeightParentSkipRoot(n *priorityNodeRFC7540) string {
switch {
case n.id == 0:
return ""
case n.parent == nil:
return fmt.Sprintf("%d{weight:%d,parent:nil}", n.id, n.weight)
default:
return fmt.Sprintf("%d{weight:%d,parent:%d}", n.id, n.weight, n.parent.id)
}
}
func TestPriorityTwoStreams(t *testing.T) {
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{})
want := "1{weight:15,parent:0} 2{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After open\ngot %q\nwant %q", got, want)
}
// Move 1's parent to 2.
ws.AdjustStream(1, PriorityParam{
StreamDep: 2,
Weight: 32,
Exclusive: false,
})
want = "1{weight:32,parent:2} 2{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityAdjustExclusiveZero(t *testing.T) {
// 1, 2, and 3 are all children of the 0 stream.
// Exclusive reprioritization to any of the streams should bring
// the rest of the streams under the reprioritized stream.
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{})
ws.OpenStream(3, OpenStreamOptions{})
want := "1{weight:15,parent:0} 2{weight:15,parent:0} 3{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After open\ngot %q\nwant %q", got, want)
}
ws.AdjustStream(2, PriorityParam{
StreamDep: 0,
Weight: 20,
Exclusive: true,
})
want = "1{weight:15,parent:2} 2{weight:20,parent:0} 3{weight:15,parent:2}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityAdjustOwnParent(t *testing.T) {
// Assigning a node as its own parent should have no effect.
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{})
ws.AdjustStream(2, PriorityParam{
StreamDep: 2,
Weight: 20,
Exclusive: true,
})
want := "1{weight:15,parent:0} 2{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityClosedStreams(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{MaxClosedNodesInTree: 2}).(*priorityWriteSchedulerRFC7540)
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 2})
ws.OpenStream(4, OpenStreamOptions{PusherID: 3})
// Close the first three streams. We lose 1, but keep 2 and 3.
ws.CloseStream(1)
ws.CloseStream(2)
ws.CloseStream(3)
want := "2{weight:15,parent:0} 3{weight:15,parent:2} 4{weight:15,parent:3}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After close\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
// Adding a stream as an exclusive child of 1 gives it default
// priorities, since 1 is gone.
ws.OpenStream(5, OpenStreamOptions{})
ws.AdjustStream(5, PriorityParam{StreamDep: 1, Weight: 15, Exclusive: true})
// Adding a stream as an exclusive child of 2 should work, since 2 is not gone.
ws.OpenStream(6, OpenStreamOptions{})
ws.AdjustStream(6, PriorityParam{StreamDep: 2, Weight: 15, Exclusive: true})
want = "2{weight:15,parent:0} 3{weight:15,parent:6} 4{weight:15,parent:3} 5{weight:15,parent:0} 6{weight:15,parent:2}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After add streams\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityClosedStreamsDisabled(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{}).(*priorityWriteSchedulerRFC7540)
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 2})
// Close the first two streams. We keep only 3.
ws.CloseStream(1)
ws.CloseStream(2)
want := "3{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After close\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityIdleStreams(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{MaxIdleNodesInTree: 2}).(*priorityWriteSchedulerRFC7540)
ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 15}) // idle
ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 15}) // idle
ws.AdjustStream(3, PriorityParam{StreamDep: 2, Weight: 20}) // idle
ws.OpenStream(4, OpenStreamOptions{})
ws.OpenStream(5, OpenStreamOptions{})
ws.OpenStream(6, OpenStreamOptions{})
ws.AdjustStream(4, PriorityParam{StreamDep: 1, Weight: 15})
ws.AdjustStream(5, PriorityParam{StreamDep: 2, Weight: 15})
ws.AdjustStream(6, PriorityParam{StreamDep: 3, Weight: 15})
want := "2{weight:15,parent:0} 3{weight:20,parent:2} 4{weight:15,parent:0} 5{weight:15,parent:2} 6{weight:15,parent:3}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After open\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPriorityIdleStreamsDisabled(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{}).(*priorityWriteSchedulerRFC7540)
ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 15}) // idle
ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 15}) // idle
ws.AdjustStream(3, PriorityParam{StreamDep: 2, Weight: 20}) // idle
ws.OpenStream(4, OpenStreamOptions{})
want := "4{weight:15,parent:0}"
if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {
t.Errorf("After open\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPrioritySection531NonExclusive(t *testing.T) {
// Example from RFC 7540 Section 5.3.1.
// A,B,C,D = 1,2,3,4
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 1})
ws.OpenStream(4, OpenStreamOptions{})
ws.AdjustStream(4, PriorityParam{
StreamDep: 1,
Weight: 15,
Exclusive: false,
})
want := "1{parent:0} 2{parent:1} 3{parent:1} 4{parent:1}"
if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPrioritySection531Exclusive(t *testing.T) {
// Example from RFC 7540 Section 5.3.1.
// A,B,C,D = 1,2,3,4
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 1})
ws.OpenStream(4, OpenStreamOptions{})
ws.AdjustStream(4, PriorityParam{
StreamDep: 1,
Weight: 15,
Exclusive: true,
})
want := "1{parent:0} 2{parent:4} 3{parent:4} 4{parent:1}"
if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func makeSection533Tree() *priorityWriteSchedulerRFC7540 {
// Initial tree from RFC 7540 Section 5.3.3.
// A,B,C,D,E,F = 1,2,3,4,5,6
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 1})
ws.OpenStream(4, OpenStreamOptions{PusherID: 3})
ws.OpenStream(5, OpenStreamOptions{PusherID: 3})
ws.OpenStream(6, OpenStreamOptions{PusherID: 4})
return ws
}
func TestPrioritySection533NonExclusive(t *testing.T) {
// Example from RFC 7540 Section 5.3.3.
// A,B,C,D,E,F = 1,2,3,4,5,6
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 1})
ws.OpenStream(4, OpenStreamOptions{PusherID: 3})
ws.OpenStream(5, OpenStreamOptions{PusherID: 3})
ws.OpenStream(6, OpenStreamOptions{PusherID: 4})
ws.AdjustStream(1, PriorityParam{
StreamDep: 4,
Weight: 15,
Exclusive: false,
})
want := "1{parent:4} 2{parent:1} 3{parent:1} 4{parent:0} 5{parent:3} 6{parent:4}"
if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func TestPrioritySection533Exclusive(t *testing.T) {
// Example from RFC 7540 Section 5.3.3.
// A,B,C,D,E,F = 1,2,3,4,5,6
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 1})
ws.OpenStream(4, OpenStreamOptions{PusherID: 3})
ws.OpenStream(5, OpenStreamOptions{PusherID: 3})
ws.OpenStream(6, OpenStreamOptions{PusherID: 4})
ws.AdjustStream(1, PriorityParam{
StreamDep: 4,
Weight: 15,
Exclusive: true,
})
want := "1{parent:4} 2{parent:1} 3{parent:1} 4{parent:0} 5{parent:3} 6{parent:1}"
if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {
t.Errorf("After adjust\ngot %q\nwant %q", got, want)
}
if err := checkPriorityWellFormed(ws); err != nil {
t.Error(err)
}
}
func checkPopAll(ws WriteScheduler, order []uint32) error {
for k, id := range order {
wr, ok := ws.Pop()
if !ok {
return fmt.Errorf("Pop[%d]: got ok=false, want %d (order=%v)", k, id, order)
}
if got := wr.StreamID(); got != id {
return fmt.Errorf("Pop[%d]: got %v, want %d (order=%v)", k, got, id, order)
}
}
wr, ok := ws.Pop()
if ok {
return fmt.Errorf("Pop[%d]: got %v, want ok=false (order=%v)", len(order), wr.StreamID(), order)
}
return nil
}
func TestPriorityPopFrom533Tree(t *testing.T) {
ws := makeSection533Tree()
ws.Push(makeWriteHeadersRequest(3 /*C*/))
ws.Push(makeWriteNonStreamRequest())
ws.Push(makeWriteHeadersRequest(5 /*E*/))
ws.Push(makeWriteHeadersRequest(1 /*A*/))
t.Log("tree:", fmtTree(ws, fmtNodeParentSkipRoot))
if err := checkPopAll(ws, []uint32{0 /*NonStream*/, 1, 3, 5}); err != nil {
t.Error(err)
}
}
// #49741 RST_STREAM and Control frames should have more priority than data
// frames to avoid blocking streams caused by clients not able to drain the
// queue.
func TestPriorityRSTFrames(t *testing.T) {
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
sc := &serverConn{maxFrameSize: 16}
st1 := &stream{id: 1, sc: sc}
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 16), false}, st1, nil})
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 16), false}, st1, nil})
ws.Push(makeWriteRSTStream(1))
// No flow-control bytes available.
wr, ok := ws.Pop()
if !ok {
t.Fatalf("Pop should work for control frames and not be limited by flow control")
}
if _, ok := wr.write.(StreamError); !ok {
t.Fatal("expected RST stream frames first", wr)
}
}
func TestPriorityPopFromLinearTree(t *testing.T) {
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
ws.OpenStream(3, OpenStreamOptions{PusherID: 2})
ws.OpenStream(4, OpenStreamOptions{PusherID: 3})
ws.Push(makeWriteHeadersRequest(3))
ws.Push(makeWriteHeadersRequest(4))
ws.Push(makeWriteHeadersRequest(1))
ws.Push(makeWriteHeadersRequest(2))
ws.Push(makeWriteNonStreamRequest())
ws.Push(makeWriteNonStreamRequest())
t.Log("tree:", fmtTree(ws, fmtNodeParentSkipRoot))
if err := checkPopAll(ws, []uint32{0, 0 /*NonStreams*/, 1, 2, 3, 4}); err != nil {
t.Error(err)
}
}
func TestPriorityFlowControl(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{ThrottleOutOfOrderWrites: false})
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
sc := &serverConn{maxFrameSize: 16}
st1 := &stream{id: 1, sc: sc}
st2 := &stream{id: 2, sc: sc}
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 16), false}, st1, nil})
ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 16), false}, st2, nil})
ws.AdjustStream(2, PriorityParam{StreamDep: 1})
// No flow-control bytes available.
if wr, ok := ws.Pop(); ok {
t.Fatalf("Pop(limited by flow control)=%v,true, want false", wr)
}
// Add enough flow-control bytes to write st2 in two Pop calls.
// Should write data from st2 even though it's lower priority than st1.
for i := 1; i <= 2; i++ {
st2.flow.add(8)
wr, ok := ws.Pop()
if !ok {
t.Fatalf("Pop(%d)=false, want true", i)
}
if got, want := wr.DataSize(), 8; got != want {
t.Fatalf("Pop(%d)=%d bytes, want %d bytes", i, got, want)
}
}
}
func TestPriorityThrottleOutOfOrderWrites(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{ThrottleOutOfOrderWrites: true})
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{PusherID: 1})
sc := &serverConn{maxFrameSize: 4096}
st1 := &stream{id: 1, sc: sc}
st2 := &stream{id: 2, sc: sc}
st1.flow.add(4096)
st2.flow.add(4096)
ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 4096), false}, st2, nil})
ws.AdjustStream(2, PriorityParam{StreamDep: 1})
// We have enough flow-control bytes to write st2 in a single Pop call.
// However, due to out-of-order write throttling, the first call should
// only write 1KB.
wr, ok := ws.Pop()
if !ok {
t.Fatalf("Pop(st2.first)=false, want true")
}
if got, want := wr.StreamID(), uint32(2); got != want {
t.Fatalf("Pop(st2.first)=stream %d, want stream %d", got, want)
}
if got, want := wr.DataSize(), 1024; got != want {
t.Fatalf("Pop(st2.first)=%d bytes, want %d bytes", got, want)
}
// Now add data on st1. This should take precedence.
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 4096), false}, st1, nil})
wr, ok = ws.Pop()
if !ok {
t.Fatalf("Pop(st1)=false, want true")
}
if got, want := wr.StreamID(), uint32(1); got != want {
t.Fatalf("Pop(st1)=stream %d, want stream %d", got, want)
}
if got, want := wr.DataSize(), 4096; got != want {
t.Fatalf("Pop(st1)=%d bytes, want %d bytes", got, want)
}
// Should go back to writing 1KB from st2.
wr, ok = ws.Pop()
if !ok {
t.Fatalf("Pop(st2.last)=false, want true")
}
if got, want := wr.StreamID(), uint32(2); got != want {
t.Fatalf("Pop(st2.last)=stream %d, want stream %d", got, want)
}
if got, want := wr.DataSize(), 1024; got != want {
t.Fatalf("Pop(st2.last)=%d bytes, want %d bytes", got, want)
}
}
func TestPriorityWeights(t *testing.T) {
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{})
sc := &serverConn{maxFrameSize: 8}
st1 := &stream{id: 1, sc: sc}
st2 := &stream{id: 2, sc: sc}
st1.flow.add(40)
st2.flow.add(40)
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 40), false}, st1, nil})
ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 40), false}, st2, nil})
ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 34})
ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 9})
// st1 gets 3.5x the bandwidth of st2 (3.5 = (34+1)/(9+1)).
// The maximum frame size is 8 bytes. The write sequence should be:
// st1, total bytes so far is (st1=8, st=0)
// st2, total bytes so far is (st1=8, st=8)
// st1, total bytes so far is (st1=16, st=8)
// st1, total bytes so far is (st1=24, st=8) // 3x bandwidth
// st1, total bytes so far is (st1=32, st=8) // 4x bandwidth
// st2, total bytes so far is (st1=32, st=16) // 2x bandwidth
// st1, total bytes so far is (st1=40, st=16)
// st2, total bytes so far is (st1=40, st=24)
// st2, total bytes so far is (st1=40, st=32)
// st2, total bytes so far is (st1=40, st=40)
if err := checkPopAll(ws, []uint32{1, 2, 1, 1, 1, 2, 1, 2, 2, 2}); err != nil {
t.Error(err)
}
}
func TestPriorityWeightsMinMax(t *testing.T) {
ws := defaultPriorityWriteScheduler()
ws.OpenStream(1, OpenStreamOptions{})
ws.OpenStream(2, OpenStreamOptions{})
sc := &serverConn{maxFrameSize: 8}
st1 := &stream{id: 1, sc: sc}
st2 := &stream{id: 2, sc: sc}
st1.flow.add(40)
st2.flow.add(40)
// st2 gets 256x the bandwidth of st1 (256 = (255+1)/(0+1)).
// The maximum frame size is 8 bytes. The write sequence should be:
// st2, total bytes so far is (st1=0, st=8)
// st1, total bytes so far is (st1=8, st=8)
// st2, total bytes so far is (st1=8, st=16)
// st2, total bytes so far is (st1=8, st=24)
// st2, total bytes so far is (st1=8, st=32)
// st2, total bytes so far is (st1=8, st=40) // 5x bandwidth
// st1, total bytes so far is (st1=16, st=40)
// st1, total bytes so far is (st1=24, st=40)
// st1, total bytes so far is (st1=32, st=40)
// st1, total bytes so far is (st1=40, st=40)
ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 40), false}, st1, nil})
ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 40), false}, st2, nil})
ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 0})
ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 255})
if err := checkPopAll(ws, []uint32{2, 1, 2, 2, 2, 2, 1, 1, 1, 1}); err != nil {
t.Error(err)
}
}
func TestPriorityRstStreamOnNonOpenStreams(t *testing.T) {
ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 0,
MaxIdleNodesInTree: 0,
})
ws.OpenStream(1, OpenStreamOptions{})
ws.CloseStream(1)
ws.Push(FrameWriteRequest{write: streamError(1, ErrCodeProtocol)})
ws.Push(FrameWriteRequest{write: streamError(2, ErrCodeProtocol)})
if err := checkPopAll(ws, []uint32{1, 2}); err != nil {
t.Error(err)
}
}
// https://go.dev/issue/66514
func TestPriorityIssue66514(t *testing.T) {
addDep := func(ws *priorityWriteSchedulerRFC7540, child uint32, parent uint32) {
ws.AdjustStream(child, PriorityParam{
StreamDep: parent,
Exclusive: false,
Weight: 16,
})
}
validateDepTree := func(ws *priorityWriteSchedulerRFC7540, id uint32, t *testing.T) {
for n := ws.nodes[id]; n != nil; n = n.parent {
if n.parent == nil {
if n.id != uint32(0) {
t.Errorf("detected nodes not parented to 0")
}
}
}
}
ws := NewPriorityWriteScheduler(nil).(*priorityWriteSchedulerRFC7540)
// Root entry
addDep(ws, uint32(1), uint32(0))
addDep(ws, uint32(3), uint32(1))
addDep(ws, uint32(5), uint32(1))
for id := uint32(7); id < uint32(100); id += uint32(4) {
addDep(ws, id, id-uint32(4))
addDep(ws, id+uint32(2), id-uint32(4))
validateDepTree(ws, id, t)
}
}

View File

@@ -1,77 +0,0 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "math"
// NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
// priorities. Control frames like SETTINGS and PING are written before DATA
// frames, but if no control frames are queued and multiple streams have queued
// HEADERS or DATA frames, Pop selects a ready stream arbitrarily.
func NewRandomWriteScheduler() WriteScheduler {
return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)}
}
type randomWriteScheduler struct {
// zero are frames not associated with a specific stream.
zero writeQueue
// sq contains the stream-specific queues, keyed by stream ID.
// When a stream is idle, closed, or emptied, it's deleted
// from the map.
sq map[uint32]*writeQueue
// pool of empty queues for reuse.
queuePool writeQueuePool
}
func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
// no-op: idle streams are not tracked
}
func (ws *randomWriteScheduler) CloseStream(streamID uint32) {
q, ok := ws.sq[streamID]
if !ok {
return
}
delete(ws.sq, streamID)
ws.queuePool.put(q)
}
func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
// no-op: priorities are ignored
}
func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) {
if wr.isControl() {
ws.zero.push(wr)
return
}
id := wr.StreamID()
q, ok := ws.sq[id]
if !ok {
q = ws.queuePool.get()
ws.sq[id] = q
}
q.push(wr)
}
func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) {
// Control and RST_STREAM frames first.
if !ws.zero.empty() {
return ws.zero.shift(), true
}
// Iterate over all non-idle streams until finding one that can be consumed.
for streamID, q := range ws.sq {
if wr, ok := q.consume(math.MaxInt32); ok {
if q.empty() {
delete(ws.sq, streamID)
ws.queuePool.put(q)
}
return wr, true
}
}
return FrameWriteRequest{}, false
}

View File

@@ -1,64 +0,0 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "testing"
func TestRandomScheduler(t *testing.T) {
ws := NewRandomWriteScheduler()
ws.Push(makeWriteHeadersRequest(3))
ws.Push(makeWriteHeadersRequest(4))
ws.Push(makeWriteHeadersRequest(1))
ws.Push(makeWriteHeadersRequest(2))
ws.Push(makeWriteNonStreamRequest())
ws.Push(makeWriteNonStreamRequest())
ws.Push(makeWriteRSTStream(1))
// Pop all frames. Should get the non-stream and RST stream requests first,
// followed by the stream requests in any order.
var order []FrameWriteRequest
for {
wr, ok := ws.Pop()
if !ok {
break
}
order = append(order, wr)
}
t.Logf("got frames: %v", order)
if len(order) != 7 {
t.Fatalf("got %d frames, expected 6", len(order))
}
if order[0].StreamID() != 0 || order[1].StreamID() != 0 {
t.Fatal("expected non-stream frames first", order[0], order[1])
}
if _, ok := order[2].write.(StreamError); !ok {
t.Fatal("expected RST stream frames first", order[2])
}
got := make(map[uint32]bool)
for _, wr := range order[2:] {
got[wr.StreamID()] = true
}
for id := uint32(1); id <= 4; id++ {
if !got[id] {
t.Errorf("frame not found for stream %d", id)
}
}
// Verify that we clean up maps for empty queues in all cases (golang.org/issue/33812)
const arbitraryStreamID = 123
ws.Push(makeHandlerPanicRST(arbitraryStreamID))
rws := ws.(*randomWriteScheduler)
if got, want := len(rws.sq), 1; got != want {
t.Fatalf("len of 123 stream = %v; want %v", got, want)
}
_, ok := ws.Pop()
if !ok {
t.Fatal("expected to be able to Pop")
}
if got, want := len(rws.sq), 0; got != want {
t.Fatalf("len of 123 stream = %v; want %v", got, want)
}
}