internal/poll: honor ERROR_OPERATION_ABORTED if pipe is not closed

FD.Read converts a syscall.ERROR_OPERATION_ABORTED error to
ErrFileClosing. It does that in case the pipe operation was aborted by
a CancelIoEx call in FD.Close.

It doesn't take into account that the operation might have been
aborted by a CancelIoEx call in external code. In that case, the
operation should return the error as is.

Change-Id: I75dcf0edaace8b57dc47b398ea591ca9f116112b
Reviewed-on: https://go-review.googlesource.com/c/go/+/661555
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
This commit is contained in:
qmuntal 2025-03-28 20:38:34 +01:00 committed by Quim Muntal
parent 5ec76ae5aa
commit b9934d855c
3 changed files with 55 additions and 5 deletions

View File

@ -250,3 +250,8 @@ func (fd *FD) writeUnlock() {
fd.destroy() fd.destroy()
} }
} }
// closing returns true if fd is closing.
func (fd *FD) closing() bool {
return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
}

View File

@ -461,10 +461,12 @@ func (fd *FD) Read(buf []byte) (int, error) {
// Returned by pipes when the other end is closed. // Returned by pipes when the other end is closed.
err = nil err = nil
case syscall.ERROR_OPERATION_ABORTED: case syscall.ERROR_OPERATION_ABORTED:
// Close uses CancelIoEx to interrupt concurrent I/O for pipes. if fd.closing() {
// If the fd is a pipe and the Read was interrupted by CancelIoEx, // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// we assume it is interrupted by Close. // If the fd is a pipe and the Read was interrupted by CancelIoEx,
err = ErrFileClosing // we assume it is interrupted by Close.
err = ErrFileClosing
}
} }
} }
case kindNet: case kindNet:
@ -717,7 +719,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
}) })
fd.addOffset(n) fd.addOffset(n)
if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() {
// Close uses CancelIoEx to interrupt concurrent I/O for pipes. // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// If the fd is a pipe and the Write was interrupted by CancelIoEx, // If the fd is a pipe and the Write was interrupted by CancelIoEx,
// we assume it is interrupted by Close. // we assume it is interrupted by Close.

View File

@ -18,6 +18,7 @@ import (
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"testing" "testing"
"time"
"unsafe" "unsafe"
) )
@ -339,7 +340,9 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
} }
func TestFile(t *testing.T) { func TestFile(t *testing.T) {
t.Parallel()
test := func(t *testing.T, r, w bool) { test := func(t *testing.T, r, w bool) {
t.Parallel()
name := filepath.Join(t.TempDir(), "foo") name := filepath.Join(t.TempDir(), "foo")
rh := newFile(t, name, r) rh := newFile(t, name, r)
wh := newFile(t, name, w) wh := newFile(t, name, w)
@ -361,27 +364,33 @@ func TestFile(t *testing.T) {
} }
func TestPipe(t *testing.T) { func TestPipe(t *testing.T) {
t.Parallel()
t.Run("overlapped", func(t *testing.T) { t.Run("overlapped", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, true, false) name, pipe := newPipe(t, true, false)
file := newFile(t, name, true) file := newFile(t, name, true)
testReadWrite(t, pipe, file) testReadWrite(t, pipe, file)
}) })
t.Run("overlapped-write", func(t *testing.T) { t.Run("overlapped-write", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, true, false) name, pipe := newPipe(t, true, false)
file := newFile(t, name, false) file := newFile(t, name, false)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
t.Run("overlapped-read", func(t *testing.T) { t.Run("overlapped-read", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, false) name, pipe := newPipe(t, false, false)
file := newFile(t, name, true) file := newFile(t, name, true)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
t.Run("sync", func(t *testing.T) { t.Run("sync", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, false) name, pipe := newPipe(t, false, false)
file := newFile(t, name, false) file := newFile(t, name, false)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
t.Run("anonymous", func(t *testing.T) { t.Run("anonymous", func(t *testing.T) {
t.Parallel()
var r, w syscall.Handle var r, w syscall.Handle
if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil { if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil {
t.Fatal(err) t.Fatal(err)
@ -402,6 +411,7 @@ func TestPipe(t *testing.T) {
} }
func TestPipeWriteEOF(t *testing.T) { func TestPipeWriteEOF(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, true) name, pipe := newPipe(t, false, true)
file := newFile(t, name, false) file := newFile(t, name, false)
read := make(chan struct{}, 1) read := make(chan struct{}, 1)
@ -423,6 +433,39 @@ func TestPipeWriteEOF(t *testing.T) {
} }
} }
func TestPipeCanceled(t *testing.T) {
t.Parallel()
name, _ := newPipe(t, true, false)
file := newFile(t, name, true)
ch := make(chan struct{}, 1)
go func() {
for {
select {
case <-ch:
return
default:
syscall.CancelIo(syscall.Handle(file.Sysfd))
time.Sleep(100 * time.Millisecond)
}
}
}()
// Try to cancel for max 1 second.
// Canceling is normally really fast, but it can take an
// arbitrary amount of time on busy systems.
// If it takes too long, we skip the test.
file.SetReadDeadline(time.Now().Add(1 * time.Second))
var tmp [1]byte
// Read will block until the cancel is complete.
_, err := file.Read(tmp[:])
ch <- struct{}{}
if err == poll.ErrDeadlineExceeded {
t.Skip("took too long to cancel")
}
if err != syscall.ERROR_OPERATION_ABORTED {
t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
}
}
func BenchmarkReadOverlapped(b *testing.B) { func BenchmarkReadOverlapped(b *testing.B) {
benchmarkRead(b, true) benchmarkRead(b, true)
} }