quic: surface connection errors in stream methods

When reading from or writing to a stream associated with a closed
connection, return the connection-level error.

Change-Id: I7f0a4b7f531d06ad1daa46d1942d37bca8ca5698
Reviewed-on: https://go-review.googlesource.com/c/net/+/640797
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Auto-Submit: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Damien Neil
2024-12-17 09:44:09 -08:00
committed by Gopher Robot
parent dc3b8a8dce
commit 4428704833
3 changed files with 48 additions and 12 deletions

View File

@@ -230,6 +230,17 @@ func (c *Conn) setFinalError(err error) {
close(c.lifetime.donec)
}
// finalError returns the final connection status reported to the user,
// or nil if a final status has not yet been set.
func (c *Conn) finalError() error {
select {
case <-c.lifetime.donec:
return c.lifetime.finalErr
default:
}
return nil
}
func (c *Conn) waitReady(ctx context.Context) error {
select {
case <-c.lifetime.readyc:

View File

@@ -254,6 +254,11 @@ func (s *Stream) Read(b []byte) (n int, err error) {
s.conn.handleStreamBytesReadOffLoop(bytesRead) // must be done with ingate unlocked
}()
if s.inresetcode != -1 {
if s.inresetcode == streamResetByConnClose {
if err := s.conn.finalError(); err != nil {
return 0, err
}
}
return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
}
if s.inclosed.isSet() {
@@ -352,13 +357,9 @@ func (s *Stream) Write(b []byte) (n int, err error) {
// write blocked. (Unlike traditional condition variables, gates do not
// have spurious wakeups.)
}
if s.outreset.isSet() {
if err := s.writeErrorLocked(); err != nil {
s.outUnlock()
return n, errors.New("write to reset stream")
}
if s.outclosed.isSet() {
s.outUnlock()
return n, errors.New("write to closed stream")
return n, err
}
if len(b) == 0 {
break
@@ -451,13 +452,27 @@ func (s *Stream) Flush() error {
}
s.outgate.lock()
defer s.outUnlock()
if err := s.writeErrorLocked(); err != nil {
return err
}
s.flushLocked()
return nil
}
// writeErrorLocked returns the error (if any) which should be returned by write operations
// due to the stream being reset or closed.
func (s *Stream) writeErrorLocked() error {
if s.outreset.isSet() {
if s.outresetcode == streamResetByConnClose {
if err := s.conn.finalError(); err != nil {
return err
}
}
return errors.New("write to reset stream")
}
if s.outclosed.isSet() {
return errors.New("write to closed stream")
}
s.flushLocked()
return nil
}
@@ -605,8 +620,11 @@ func (s *Stream) connHasClosed() {
s.outgate.lock()
if localClose {
s.outclosed.set()
s.outreset.set()
} else {
s.outresetcode = streamResetByConnClose
s.outreset.setReceived()
}
s.outreset.set()
s.outUnlock()
}

View File

@@ -1361,14 +1361,21 @@ func TestStreamFlushStreamAfterPeerStopSending(t *testing.T) {
}
}
func TestStreamFlushStreamAfterConnectionClosed(t *testing.T) {
func TestStreamErrorsAfterConnectionClosed(t *testing.T) {
tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
permissiveTransportParameters)
wantErr := &ApplicationError{Code: 42}
tc.writeFrames(packetType1RTT, debugFrameConnectionCloseApplication{
code: 0,
code: wantErr.Code,
})
if err := s.Flush(); err == nil {
t.Errorf("s.Flush of stream on closed connection = nil, want error")
if _, err := s.Read(make([]byte, 1)); !errors.Is(err, wantErr) {
t.Errorf("s.Read on closed connection = %v, want %v", err, wantErr)
}
if _, err := s.Write(make([]byte, 1)); !errors.Is(err, wantErr) {
t.Errorf("s.Write on closed connection = %v, want %v", err, wantErr)
}
if err := s.Flush(); !errors.Is(err, wantErr) {
t.Errorf("s.Flush on closed connection = %v, want %v", err, wantErr)
}
}