internal/poll: simplify execIO

execIO has multiple return paths and multiple places where error is
mangled. This CL simplifies the function by just having one return
path.

Some more tests have been added to ensure that the error handling
is done correctly.

Updates #19098.

Change-Id: Ida0b1e85d4d123914054306e5bef8da94408b91c
Reviewed-on: https://go-review.googlesource.com/c/go/+/662215
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Auto-Submit: Carlos Amedee <carlos@golang.org>
Reviewed-by: Carlos Amedee <carlos@golang.org>
This commit is contained in:
qmuntal 2025-04-02 10:43:47 +02:00 committed by Gopher Robot
parent 8969771cc3
commit d164776615
6 changed files with 192 additions and 216 deletions

View File

@ -1,17 +0,0 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Export guts for testing on windows.
// Since testing imports os and os imports internal/poll,
// the internal/poll tests can not be in package poll.
package poll
var (
LogInitFD = &logInitFD
)
func (fd *FD) IsPartOfNetpoll() bool {
return fd.pd.runtimeCtx != 0
}

View File

@ -153,8 +153,46 @@ func (o *operation) InitMsg(p []byte, oob []byte) {
}
}
// waitIO waits for the IO operation o to complete.
func waitIO(o *operation) error {
fd := o.fd
if !fd.pd.pollable() {
// The overlapped handle is not added to the runtime poller,
// the only way to wait for the IO to complete is block.
_, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
return err
}
// Wait for our request to complete.
err := fd.pd.wait(int(o.mode), fd.isFile)
switch err {
case nil, ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded:
// No other error is expected.
default:
panic("unexpected runtime.netpoll error: " + err.Error())
}
return err
}
// cancelIO cancels the IO operation o and waits for it to complete.
func cancelIO(o *operation) {
fd := o.fd
if !fd.pd.pollable() {
return
}
// Cancel our request.
err := syscall.CancelIoEx(fd.Sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
fd.pd.waitCanceled(int(o.mode))
}
// execIO executes a single IO operation o.
// It supports both synchronous and asynchronous IO.
// o.qty and o.flags are set to zero before calling submit
// to avoid reusing the values from a previous call.
func execIO(o *operation, submit func(o *operation) error) (int, error) {
fd := o.fd
fd.initIO()
@ -163,89 +201,42 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
if err != nil {
return 0, err
}
getOverlappedResult := func() (int, error) {
// Start IO.
o.qty = 0
o.flags = 0
err = submit(o)
var waitErr error
if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) {
// IO started asynchronously or completed synchronously but
// a sync notification is required. Wait for it to complete.
waitErr = waitIO(o)
if waitErr != nil {
// IO interrupted by "close" or "timeout".
cancelIO(o)
// We issued a cancellation request, but the IO operation may still succeeded
// before the cancellation request runs.
}
if fd.isFile {
err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false)
} else {
err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags)
}
switch err {
case nil:
return int(o.qty), nil
case syscall.ERROR_HANDLE_EOF:
// EOF reached.
return int(o.qty), io.EOF
case syscall.ERROR_MORE_DATA, windows.WSAEMSGSIZE:
// More data available. Return back the size of received data.
return int(o.qty), err
default:
return 0, err
}
// ERROR_OPERATION_ABORTED may have been caused by us. In that case,
// map it to our own error. Don't do more than that, each submitted
// function may have its own meaning for each error.
if err == syscall.ERROR_OPERATION_ABORTED {
if waitErr != nil {
// IO canceled by the poller while waiting for completion.
err = waitErr
} else if fd.kind == kindPipe && fd.closing() {
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// If the fd is a pipe and the Write was interrupted by CancelIoEx,
// we assume it is interrupted by Close.
err = errClosing(fd.isFile)
}
}
// Start IO.
err = submit(o)
if !fd.pd.pollable() {
if err == syscall.ERROR_IO_PENDING {
// The overlapped handle is not added to the runtime poller,
// the only way to wait for the IO to complete is block.
_, err = syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
if err == nil {
return getOverlappedResult()
}
}
if err != nil {
return 0, err
}
return int(o.qty), nil
}
switch err {
case nil:
// IO completed immediately
if o.fd.skipSyncNotif {
// No completion message will follow, so return immediately.
return int(o.qty), nil
}
// Need to get our completion message anyway.
case syscall.ERROR_IO_PENDING:
// IO started, and we have to wait for its completion.
err = nil
default:
return 0, err
}
// Wait for our request to complete.
err = fd.pd.wait(int(o.mode), fd.isFile)
if err == nil {
// All is good. Extract our IO results and return.
return getOverlappedResult()
}
// IO is interrupted by "close" or "timeout"
netpollErr := err
switch netpollErr {
case ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded:
// will deal with those.
default:
panic("unexpected runtime.netpoll error: " + netpollErr.Error())
}
// Cancel our request.
err = syscall.CancelIoEx(fd.Sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
// Wait for cancellation to complete.
fd.pd.waitCanceled(int(o.mode))
n, err := getOverlappedResult()
if err != nil {
if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
err = netpollErr
}
return n, err
}
// We issued a cancellation request. But, it seems, IO operation succeeded
// before the cancellation request run. We need to treat the IO operation as
// succeeded (the bytes are actually sent/recv from network).
return n, nil
return int(o.qty), err
}
// FD is a file descriptor. The net and os packages embed this type in
@ -341,9 +332,6 @@ const (
kindPipe
)
// logInitFD is set by tests to enable file descriptor initialization logging.
var logInitFD func(net int, fd *FD, err error)
func (fd *FD) initIO() error {
fd.initIOOnce.Do(func() {
if fd.initPollable {
@ -358,19 +346,13 @@ func (fd *FD) initIO() error {
fd.initPollable = false
}
}
if logInitFD != nil {
logInitFD(int(fd.kind), fd, fd.initIOErr)
}
if !fd.initPollable {
// Handle opened for overlapped I/O (aka non-blocking) that are not added
// to the runtime poller need special handling when reading and writing.
var info windows.FILE_MODE_INFORMATION
if err := windows.NtQueryInformationFile(fd.Sysfd, &windows.IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), windows.FileModeInformation); err == nil {
fd.isBlocking = info.Mode&(windows.FILE_SYNCHRONOUS_IO_ALERT|windows.FILE_SYNCHRONOUS_IO_NONALERT) != 0
} else {
// If we fail to get the file mode information, assume the file is blocking.
fd.isBlocking = true
}
// If we fail to get the file mode information, assume the file is blocking.
overlapped, _ := windows.IsNonblock(fd.Sysfd)
fd.isBlocking = !overlapped
fd.skipSyncNotif = true
} else {
fd.rop.runtimeCtx = fd.pd.runtimeCtx
fd.wop.runtimeCtx = fd.pd.runtimeCtx
@ -379,9 +361,7 @@ func (fd *FD) initIO() error {
err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
)
if err == nil {
fd.skipSyncNotif = true
}
fd.skipSyncNotif = err == nil
}
}
})
@ -429,11 +409,6 @@ func (fd *FD) Init(net string, pollable bool) error {
// handles and that cares about handle IOCP association errors.
// We can should do the IOCP association here.
return fd.initIO()
} else {
if logInitFD != nil {
// For testing.
logInitFD(int(fd.kind), fd, nil)
}
}
return nil
}
@ -508,21 +483,13 @@ func (fd *FD) Read(buf []byte) (int, error) {
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
if err == syscall.ERROR_HANDLE_EOF {
switch err {
case syscall.ERROR_HANDLE_EOF:
err = io.EOF
}
if fd.kind == kindPipe && err != nil {
switch err {
case syscall.ERROR_BROKEN_PIPE:
// Returned by pipes when the other end is closed.
err = nil
case syscall.ERROR_OPERATION_ABORTED:
if fd.closing() {
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// If the fd is a pipe and the Read was interrupted by CancelIoEx,
// we assume it is interrupted by Close.
err = ErrFileClosing
}
case syscall.ERROR_BROKEN_PIPE:
// ReadFile only documents ERROR_BROKEN_PIPE for pipes.
if fd.kind == kindPipe {
err = io.EOF
}
}
case kindNet:
@ -646,10 +613,8 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
n, err := execIO(o, func(o *operation) error {
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
})
if err != nil {
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}
if len(b) != 0 {
err = fd.eofError(n, err)
@ -774,12 +739,6 @@ func (fd *FD) Write(buf []byte) (int, error) {
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() {
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// If the fd is a pipe and the Write was interrupted by CancelIoEx,
// we assume it is interrupted by Close.
err = ErrFileClosing
}
case kindNet:
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
@ -1185,10 +1144,10 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
o := &fd.rop
o.InitBuf(nil)
if !fd.IsStream {
o.flags |= windows.MSG_PEEK
}
_, err := execIO(o, func(o *operation) error {
if !fd.IsStream {
o.flags |= windows.MSG_PEEK
}
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if err == windows.WSAEMSGSIZE {

View File

@ -7,7 +7,6 @@ package poll_test
import (
"bytes"
"errors"
"fmt"
"internal/poll"
"internal/syscall/windows"
"io"
@ -22,57 +21,37 @@ import (
"unsafe"
)
type loggedFD struct {
Net int
FD *poll.FD
Err error
}
var (
logMu sync.Mutex
loggedFDs map[syscall.Handle]*loggedFD
)
func logFD(net int, fd *poll.FD, err error) {
logMu.Lock()
defer logMu.Unlock()
loggedFDs[fd.Sysfd] = &loggedFD{
Net: net,
FD: fd,
Err: err,
}
}
func init() {
loggedFDs = make(map[syscall.Handle]*loggedFD)
*poll.LogInitFD = logFD
poll.InitWSA()
}
func findLoggedFD(h syscall.Handle) (lfd *loggedFD, found bool) {
logMu.Lock()
defer logMu.Unlock()
lfd, found = loggedFDs[h]
return lfd, found
}
// checkFileIsNotPartOfNetpoll verifies that f is not managed by netpoll.
// It returns error, if check fails.
func checkFileIsNotPartOfNetpoll(f *os.File) error {
lfd, found := findLoggedFD(syscall.Handle(f.Fd()))
if !found {
return fmt.Errorf("%v fd=%v: is not found in the log", f.Name(), f.Fd())
func checkFileIsNotPartOfNetpoll(t *testing.T, f *os.File) {
t.Helper()
sc, err := f.SyscallConn()
if err != nil {
t.Fatal(err)
}
if lfd.FD.IsPartOfNetpoll() {
return fmt.Errorf("%v fd=%v: is part of netpoll, but should not be (logged: net=%v err=%v)", f.Name(), f.Fd(), lfd.Net, lfd.Err)
if err := sc.Control(func(fd uintptr) {
// Only try to associate the file with an IOCP if the handle is opened for overlapped I/O,
// else the association will always fail.
overlapped, err := windows.IsNonblock(syscall.Handle(fd))
if err != nil {
t.Fatalf("%v fd=%v: %v", f.Name(), fd, err)
}
if overlapped {
// If the file is part of netpoll, then associating it with another IOCP should fail.
if _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1); err != nil {
t.Fatalf("%v fd=%v: is part of netpoll, but should not be: %v", f.Name(), fd, err)
}
}
}); err != nil {
t.Fatalf("%v fd=%v: is not initialized", f.Name(), f.Fd())
}
return nil
}
func TestFileFdsAreInitialised(t *testing.T) {
t.Parallel()
exe, err := os.Executable()
if err != nil {
t.Fatal(err)
@ -83,15 +62,14 @@ func TestFileFdsAreInitialised(t *testing.T) {
}
defer f.Close()
err = checkFileIsNotPartOfNetpoll(f)
if err != nil {
t.Fatal(err)
}
checkFileIsNotPartOfNetpoll(t, f)
}
func TestSerialFdsAreInitialised(t *testing.T) {
t.Parallel()
for _, name := range []string{"COM1", "COM2", "COM3", "COM4"} {
t.Run(name, func(t *testing.T) {
t.Parallel()
h, err := syscall.CreateFile(syscall.StringToUTF16Ptr(name),
syscall.GENERIC_READ|syscall.GENERIC_WRITE,
0,
@ -113,15 +91,13 @@ func TestSerialFdsAreInitialised(t *testing.T) {
f := os.NewFile(uintptr(h), name)
defer f.Close()
err = checkFileIsNotPartOfNetpoll(f)
if err != nil {
t.Fatal(err)
}
checkFileIsNotPartOfNetpoll(t, f)
})
}
}
func TestWSASocketConflict(t *testing.T) {
t.Parallel()
s, err := windows.WSASocket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP, nil, 0, windows.WSA_FLAG_OVERLAPPED)
if err != nil {
t.Fatal(err)
@ -200,8 +176,12 @@ func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable boo
err := fd.Init(kind, pollable)
if overlapped && err != nil {
// Overlapped file handles should not error.
fd.Close()
t.Fatal(err)
}
t.Cleanup(func() {
fd.Close()
})
return &fd
}
@ -221,13 +201,9 @@ func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := syscall.CloseHandle(h); err != nil {
t.Fatal(err)
}
})
typ, err := syscall.GetFileType(h)
if err != nil {
syscall.CloseHandle(h)
t.Fatal(err)
}
kind := "file"
@ -266,19 +242,14 @@ func newPipe(t testing.TB, name string, message, overlapped, pollable bool) *pol
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
typ := windows.PIPE_TYPE_BYTE
typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
if message {
typ = windows.PIPE_TYPE_MESSAGE
typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
}
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := syscall.CloseHandle(h); err != nil {
t.Fatal(err)
}
})
return newFD(t, h, "pipe", overlapped, pollable)
}
@ -354,6 +325,29 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
close(write)
}
func testFileReadEOF(t *testing.T, f *poll.FD) {
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
t.Fatal(err)
}
var buf [1]byte
n, err := f.Read(buf[:])
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
n, err = f.Pread(buf[:], end)
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestFile(t *testing.T) {
t.Parallel()
tests := []struct {
@ -377,6 +371,7 @@ func TestFile(t *testing.T) {
wh := newFile(t, name, tt.overlappedWrite, tt.pollable)
testReadWrite(t, rh, wh)
testPreadPwrite(t, rh, wh)
testFileReadEOF(t, rh)
})
}
}
@ -426,20 +421,35 @@ func TestPipe(t *testing.T) {
})
}
func TestPipeWriteEOF(t *testing.T) {
func TestPipeMessageReadEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, false, true)
file := newFile(t, name, false, true)
read := make(chan struct{}, 1)
go func() {
_, err := pipe.Write(nil)
read <- struct{}{}
if err != nil {
t.Error(err)
}
}()
<-read
pipe := newMessagePipe(t, name, true, true)
file := newFile(t, name, true, true)
_, err := pipe.Write(nil)
if err != nil {
t.Error(err)
}
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestPipeClosedEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newBytePipe(t, name, true, false)
file := newFile(t, name, true, true)
pipe.Close()
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {

View File

@ -59,11 +59,11 @@ func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) {
chunkSize = n
}
o.qty = uint32(chunkSize)
o.o.Offset = uint32(curpos)
o.o.OffsetHigh = uint32(curpos >> 32)
nw, err := execIO(o, func(o *operation) error {
o.qty = uint32(chunkSize)
return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
})
if err != nil {

View File

@ -0,0 +1,21 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package windows
import (
"syscall"
"unsafe"
)
// IsNonblock returns whether the file descriptor fd is opened
// in non-blocking mode, that is, the [syscall.FILE_FLAG_OVERLAPPED] flag
// was set when the file was opened.
func IsNonblock(fd syscall.Handle) (nonblocking bool, err error) {
var info FILE_MODE_INFORMATION
if err := NtQueryInformationFile(syscall.Handle(fd), &IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), FileModeInformation); err != nil {
return false, err
}
return info.Mode&(FILE_SYNCHRONOUS_IO_ALERT|FILE_SYNCHRONOUS_IO_NONALERT) == 0, nil
}

View File

@ -511,6 +511,9 @@ const (
PIPE_TYPE_BYTE = 0x00000000
PIPE_TYPE_MESSAGE = 0x00000004
PIPE_READMODE_BYTE = 0x00000000
PIPE_READMODE_MESSAGE = 0x00000002
)
//sys CreateIoCompletionPort(filehandle syscall.Handle, cphandle syscall.Handle, key uintptr, threadcnt uint32) (handle syscall.Handle, err error)