http2: implement a more efficient writeQueue that avoids unnecessary copies.

Our previous implementation of writeQueue relies on one
[]FrameWriteRequests, forcing us to copy the rest of the slice's content
whenever we remove an item from the front.

This change remedies this problem by implementing writeQueue using
two-stage queues, similar to Okasaki's purely functional queue.

With 25 frames per stream, we are observing the following performance
improvement:
goos: linux
goarch: amd64
pkg: golang.org/x/net/http2
cpu: AMD EPYC 7B13
              │  /tmp/old   │              /tmp/new               │
              │   sec/op    │   sec/op     vs base                │
WriteQueue-64   508.3n ± 3%   305.7n ± 3%  -39.86% (p=0.000 n=10)

              │  /tmp/old  │            /tmp/new            │
              │    B/op    │    B/op     vs base            │
WriteQueue-64   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

              │  /tmp/old  │            /tmp/new            │
              │ allocs/op  │ allocs/op   vs base            │
WriteQueue-64   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

As the number of frames increases, the performance difference becomes
more stark as the old implementation does a quadratic amount of copying
in total to be able to fully consume a queue.

Change-Id: Ide816ebdd89a41275b5829683c0f10d48321af50
Reviewed-on: https://go-review.googlesource.com/c/net/+/710635
Reviewed-by: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Nicholas Husin <husin@google.com>
This commit is contained in:
Nicholas S. Husin
2025-10-09 02:10:40 -04:00
committed by Nicholas Husin
parent b93acc2126
commit e7c005de60
3 changed files with 67 additions and 16 deletions

View File

@@ -185,45 +185,75 @@ func (wr *FrameWriteRequest) replyToWriter(err error) {
}
// writeQueue is used by implementations of WriteScheduler.
//
// Each writeQueue contains a queue of FrameWriteRequests, meant to store all
// FrameWriteRequests associated with a given stream. This is implemented as a
// two-stage queue: currQueue[currPos:] and nextQueue. Removing an item is done
// by incrementing currPos of currQueue. Adding an item is done by appending it
// to the nextQueue. If currQueue is empty when trying to remove an item, we
// can swap currQueue and nextQueue to remedy the situation.
// This two-stage queue is analogous to the use of two lists in Okasaki's
// purely functional queue but without the overhead of reversing the list when
// swapping stages.
//
// writeQueue also contains prev and next, this can be used by implementations
// of WriteScheduler to construct data structures that represent the order of
// writing between different streams (e.g. circular linked list).
type writeQueue struct {
s []FrameWriteRequest
currQueue []FrameWriteRequest
nextQueue []FrameWriteRequest
currPos int
prev, next *writeQueue
}
func (q *writeQueue) empty() bool { return len(q.s) == 0 }
func (q *writeQueue) empty() bool {
return (len(q.currQueue) - q.currPos + len(q.nextQueue)) == 0
}
func (q *writeQueue) push(wr FrameWriteRequest) {
q.s = append(q.s, wr)
q.nextQueue = append(q.nextQueue, wr)
}
func (q *writeQueue) shift() FrameWriteRequest {
if len(q.s) == 0 {
if q.empty() {
panic("invalid use of queue")
}
wr := q.s[0]
// TODO: less copy-happy queue.
copy(q.s, q.s[1:])
q.s[len(q.s)-1] = FrameWriteRequest{}
q.s = q.s[:len(q.s)-1]
if q.currPos >= len(q.currQueue) {
q.currQueue, q.currPos, q.nextQueue = q.nextQueue, 0, q.currQueue[:0]
}
wr := q.currQueue[q.currPos]
q.currQueue[q.currPos] = FrameWriteRequest{}
q.currPos++
return wr
}
func (q *writeQueue) peek() *FrameWriteRequest {
if q.currPos < len(q.currQueue) {
return &q.currQueue[q.currPos]
}
if len(q.nextQueue) > 0 {
return &q.nextQueue[0]
}
return nil
}
// consume consumes up to n bytes from q.s[0]. If the frame is
// entirely consumed, it is removed from the queue. If the frame
// is partially consumed, the frame is kept with the consumed
// bytes removed. Returns true iff any bytes were consumed.
func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
if len(q.s) == 0 {
if q.empty() {
return FrameWriteRequest{}, false
}
consumed, rest, numresult := q.s[0].Consume(n)
consumed, rest, numresult := q.peek().Consume(n)
switch numresult {
case 0:
return FrameWriteRequest{}, false
case 1:
q.shift()
case 2:
q.s[0] = rest
*q.peek() = rest
}
return consumed, true
}
@@ -232,10 +262,15 @@ type writeQueuePool []*writeQueue
// put inserts an unused writeQueue into the pool.
func (p *writeQueuePool) put(q *writeQueue) {
for i := range q.s {
q.s[i] = FrameWriteRequest{}
for i := range q.currQueue {
q.currQueue[i] = FrameWriteRequest{}
}
q.s = q.s[:0]
for i := range q.nextQueue {
q.nextQueue[i] = FrameWriteRequest{}
}
q.currQueue = q.currQueue[:0]
q.nextQueue = q.nextQueue[:0]
q.currPos = 0
*p = append(*p, q)
}

View File

@@ -178,3 +178,20 @@ func BenchmarkWriteSchedulerLifetimePriorityRFC9218NonIncremental(b *testing.B)
incremental: 0,
})
}
func BenchmarkWriteQueue(b *testing.B) {
var qp writeQueuePool
frameCount := 25
for b.Loop() {
q := qp.get()
for range frameCount {
q.push(FrameWriteRequest{})
}
for !q.empty() {
// Since we pushed empty frames, consuming 1 byte is enough to
// consume the entire frame.
q.consume(1)
}
qp.put(q)
}
}

View File

@@ -302,7 +302,6 @@ func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
q := n.q
ws.queuePool.put(&q)
n.q.s = nil
if ws.maxClosedNodesInTree > 0 {
ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
} else {