From b9934d855c4635edf02092a72802017676abd8eb Mon Sep 17 00:00:00 2001 From: qmuntal Date: Fri, 28 Mar 2025 20:38:34 +0100 Subject: [PATCH] internal/poll: honor ERROR_OPERATION_ABORTED if pipe is not closed FD.Read converts a syscall.ERROR_OPERATION_ABORTED error to ErrFileClosing. It does that in case the pipe operation was aborted by a CancelIoEx call in FD.Close. It doesn't take into account that the operation might have been aborted by a CancelIoEx call in external code. In that case, the operation should return the error as is. Change-Id: I75dcf0edaace8b57dc47b398ea591ca9f116112b Reviewed-on: https://go-review.googlesource.com/c/go/+/661555 LUCI-TryBot-Result: Go LUCI Reviewed-by: Damien Neil Reviewed-by: Dmitri Shuralyov --- src/internal/poll/fd_mutex.go | 5 ++++ src/internal/poll/fd_windows.go | 12 ++++---- src/internal/poll/fd_windows_test.go | 43 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/internal/poll/fd_mutex.go b/src/internal/poll/fd_mutex.go index 0a8ee6f0d4..4d194df186 100644 --- a/src/internal/poll/fd_mutex.go +++ b/src/internal/poll/fd_mutex.go @@ -250,3 +250,8 @@ func (fd *FD) writeUnlock() { fd.destroy() } } + +// closing returns true if fd is closing. +func (fd *FD) closing() bool { + return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0 +} diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index 81c8293911..1caa760349 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -461,10 +461,12 @@ func (fd *FD) Read(buf []byte) (int, error) { // Returned by pipes when the other end is closed. err = nil case syscall.ERROR_OPERATION_ABORTED: - // Close uses CancelIoEx to interrupt concurrent I/O for pipes. - // If the fd is a pipe and the Read was interrupted by CancelIoEx, - // we assume it is interrupted by Close. - err = ErrFileClosing + if fd.closing() { + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Read was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = ErrFileClosing + } } } case kindNet: @@ -717,7 +719,7 @@ func (fd *FD) Write(buf []byte) (int, error) { return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) }) fd.addOffset(n) - if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { + if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() { // Close uses CancelIoEx to interrupt concurrent I/O for pipes. // If the fd is a pipe and the Write was interrupted by CancelIoEx, // we assume it is interrupted by Close. diff --git a/src/internal/poll/fd_windows_test.go b/src/internal/poll/fd_windows_test.go index f5fa4a26e3..042bdf8bed 100644 --- a/src/internal/poll/fd_windows_test.go +++ b/src/internal/poll/fd_windows_test.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "syscall" "testing" + "time" "unsafe" ) @@ -339,7 +340,9 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) { } func TestFile(t *testing.T) { + t.Parallel() test := func(t *testing.T, r, w bool) { + t.Parallel() name := filepath.Join(t.TempDir(), "foo") rh := newFile(t, name, r) wh := newFile(t, name, w) @@ -361,27 +364,33 @@ func TestFile(t *testing.T) { } func TestPipe(t *testing.T) { + t.Parallel() t.Run("overlapped", func(t *testing.T) { + t.Parallel() name, pipe := newPipe(t, true, false) file := newFile(t, name, true) testReadWrite(t, pipe, file) }) t.Run("overlapped-write", func(t *testing.T) { + t.Parallel() name, pipe := newPipe(t, true, false) file := newFile(t, name, false) testReadWrite(t, file, pipe) }) t.Run("overlapped-read", func(t *testing.T) { + t.Parallel() name, pipe := newPipe(t, false, false) file := newFile(t, name, true) testReadWrite(t, file, pipe) }) t.Run("sync", func(t *testing.T) { + t.Parallel() name, pipe := newPipe(t, false, false) file := newFile(t, name, false) testReadWrite(t, file, pipe) }) t.Run("anonymous", func(t *testing.T) { + t.Parallel() var r, w syscall.Handle if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil { t.Fatal(err) @@ -402,6 +411,7 @@ func TestPipe(t *testing.T) { } func TestPipeWriteEOF(t *testing.T) { + t.Parallel() name, pipe := newPipe(t, false, true) file := newFile(t, name, false) read := make(chan struct{}, 1) @@ -423,6 +433,39 @@ func TestPipeWriteEOF(t *testing.T) { } } +func TestPipeCanceled(t *testing.T) { + t.Parallel() + name, _ := newPipe(t, true, false) + file := newFile(t, name, true) + ch := make(chan struct{}, 1) + go func() { + for { + select { + case <-ch: + return + default: + syscall.CancelIo(syscall.Handle(file.Sysfd)) + time.Sleep(100 * time.Millisecond) + } + } + }() + // Try to cancel for max 1 second. + // Canceling is normally really fast, but it can take an + // arbitrary amount of time on busy systems. + // If it takes too long, we skip the test. + file.SetReadDeadline(time.Now().Add(1 * time.Second)) + var tmp [1]byte + // Read will block until the cancel is complete. + _, err := file.Read(tmp[:]) + ch <- struct{}{} + if err == poll.ErrDeadlineExceeded { + t.Skip("took too long to cancel") + } + if err != syscall.ERROR_OPERATION_ABORTED { + t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err) + } +} + func BenchmarkReadOverlapped(b *testing.B) { benchmarkRead(b, true) }