os,internal/poll: support I/O on overlapped handles not added to the poller

Calling syscall.ReadFile and syscall.WriteFile on overlapped handles
always need to be passed a valid *syscall.Overlapped structure, even if
the handle is not added to a IOCP (like the Go runtime poller). Else,
the syscall will fail with ERROR_INVALID_PARAMETER.

We also need to handle ERROR_IO_PENDING errors when the overlapped
handle is not added to the poller, in which case we need to block until
the operation completes.

Previous CLs already added support for overlapped handles to the poller,
mostly to keep track of the file offset independently of the file
pointer (which is not supported for overlapped handles).

Fixed #15388.
Updates #19098.

Change-Id: I2103ab892a37d0e326752ae8c2771a43c13ba42e
Reviewed-on: https://go-review.googlesource.com/c/go/+/661795
Auto-Submit: Quim Muntal <quimmuntal@gmail.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Damien Neil <dneil@google.com>
This commit is contained in:
qmuntal 2025-03-31 11:43:12 +02:00 committed by Gopher Robot
parent eec3745bd7
commit b9cbb65384
6 changed files with 278 additions and 81 deletions

View File

@ -0,0 +1,6 @@
On Windows, [NewFile] supports overlapped (a.k.a non-blocking) file handles even
when the handle can't be added to the Go runtime I/O Completion Port (IOCP), normally
because it is already attached to another IOCP. The I/O operations will be performed in
a blocking manner instead of using the Go runtime IOCP.
Particularly, this means that is now possible to reliably pass overlapped named pipes and
sockets to a Go process standard input, output, and error.

View File

@ -89,7 +89,7 @@ type operation struct {
}
func (o *operation) overlapped() *syscall.Overlapped {
if o.runtimeCtx == 0 {
if o.fd.isBlocking {
// Don't return the overlapped object if the file handle
// doesn't use overlapped I/O. It could be used, but
// that would then use the file pointer stored in the
@ -162,9 +162,36 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
if err != nil {
return 0, err
}
getOverlappedResult := func() (int, error) {
if fd.isFile {
err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false)
} else {
err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags)
}
switch err {
case nil:
return int(o.qty), nil
case syscall.ERROR_HANDLE_EOF:
// EOF reached.
return int(o.qty), io.EOF
case syscall.ERROR_MORE_DATA, windows.WSAEMSGSIZE:
// More data available. Return back the size of received data.
return int(o.qty), err
default:
return 0, err
}
}
// Start IO.
err = submit(o)
if !fd.pd.pollable() {
if err == syscall.ERROR_IO_PENDING {
// The overlapped handle is not added to the runtime poller,
// the only way to wait for the IO to complete is block.
_, err = syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
if err == nil {
return getOverlappedResult()
}
}
if err != nil {
return 0, err
}
@ -187,20 +214,8 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
// Wait for our request to complete.
err = fd.pd.wait(int(o.mode), fd.isFile)
if err == nil {
if fd.isFile {
err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false)
} else {
err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags)
}
// All is good. Extract our IO results and return.
if err != nil {
// More data available. Return back the size of received data.
if err == syscall.ERROR_MORE_DATA || err == windows.WSAEMSGSIZE {
return int(o.qty), err
}
return 0, err
}
return int(o.qty), nil
return getOverlappedResult()
}
// IO is interrupted by "close" or "timeout"
netpollErr := err
@ -219,21 +234,17 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
}
// Wait for cancellation to complete.
fd.pd.waitCanceled(int(o.mode))
if fd.isFile {
err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, true)
} else {
err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags)
}
n, err := getOverlappedResult()
if err != nil {
if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
err = netpollErr
}
return 0, err
return n, err
}
// We issued a cancellation request. But, it seems, IO operation succeeded
// before the cancellation request run. We need to treat the IO operation as
// succeeded (the bytes are actually sent/recv from network).
return int(o.qty), nil
return n, nil
}
// FD is a file descriptor. The net and os packages embed this type in
@ -285,6 +296,9 @@ type FD struct {
// The kind of this file.
kind fileKind
// Whether FILE_FLAG_OVERLAPPED was not set when opening the file
isBlocking bool
}
// setOffset sets the offset fields of the overlapped object
@ -364,11 +378,21 @@ func (fd *FD) Init(net string, pollable bool) error {
// If we could not add the handle to the runtime poller,
// assume the handle hasn't been opened for overlapped I/O.
err = fd.pd.init(fd)
pollable = err == nil
}
if logInitFD != nil {
logInitFD(net, fd, err)
}
if !pollable || err != nil {
if !pollable {
// Handle opened for overlapped I/O (aka non-blocking) that are not added
// to the runtime poller need special handling when reading and writing.
var info windows.FILE_MODE_INFORMATION
if err := windows.NtQueryInformationFile(fd.Sysfd, &windows.IO_STATUS_BLOCK{}, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)), windows.FileModeInformation); err == nil {
fd.isBlocking = info.Mode&(windows.FILE_SYNCHRONOUS_IO_ALERT|windows.FILE_SYNCHRONOUS_IO_NONALERT) != 0
} else {
// If we fail to get the file mode information, assume the file is blocking.
fd.isBlocking = true
}
return err
}
if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
@ -455,6 +479,9 @@ func (fd *FD) Read(buf []byte) (int, error) {
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}
if fd.kind == kindPipe && err != nil {
switch err {
case syscall.ERROR_BROKEN_PIPE:
@ -591,7 +618,6 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
})
if err != nil {
n = 0
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}

View File

@ -191,17 +191,17 @@ type _TCP_INFO_v0 struct {
SynRetrans uint8
}
func newFD(t testing.TB, h syscall.Handle, kind string, overlapped bool) *poll.FD {
func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable bool) *poll.FD {
fd := poll.FD{
Sysfd: h,
IsStream: true,
ZeroReadIsEOF: true,
}
err := fd.Init(kind, true)
err := fd.Init(kind, pollable)
if overlapped && err != nil {
// Overlapped file handles should not error.
t.Fatal(err)
} else if !overlapped && err == nil {
} else if !overlapped && pollable && err == nil {
// Non-overlapped file handles should return an error but still
// be usable as sync handles.
t.Fatal("expected error for non-overlapped file handle")
@ -209,7 +209,7 @@ func newFD(t testing.TB, h syscall.Handle, kind string, overlapped bool) *poll.F
return &fd
}
func newFile(t testing.TB, name string, overlapped bool) *poll.FD {
func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
namep, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
@ -230,7 +230,15 @@ func newFile(t testing.TB, name string, overlapped bool) *poll.FD {
t.Fatal(err)
}
})
return newFD(t, h, "file", overlapped)
typ, err := syscall.GetFileType(h)
if err != nil {
t.Fatal(err)
}
kind := "file"
if typ == syscall.FILE_TYPE_PIPE {
kind = "pipe"
}
return newFD(t, h, kind, overlapped, pollable)
}
var currentProces = sync.OnceValue(func() string {
@ -240,8 +248,19 @@ var currentProces = sync.OnceValue(func() string {
var pipeCounter atomic.Uint64
func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) {
name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
func newBytePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
return newPipe(t, name, false, overlapped, pollable)
}
func newMessagePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
return newPipe(t, name, true, overlapped, pollable)
}
func pipeName() string {
return `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
func newPipe(t testing.TB, name string, message, overlapped, pollable bool) *poll.FD {
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
@ -264,7 +283,7 @@ func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) {
t.Fatal(err)
}
})
return name, newFD(t, h, "pipe", overlapped)
return newFD(t, h, "pipe", overlapped, pollable)
}
func testReadWrite(t *testing.T, fdr, fdw *poll.FD) {
@ -341,54 +360,55 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
func TestFile(t *testing.T) {
t.Parallel()
test := func(t *testing.T, r, w bool) {
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
pollable bool
}{
{"overlapped", true, true, true},
{"overlapped-nonpollable", true, true, false},
{"overlapped-read", true, false, true},
{"overlapped-write", false, true, true},
{"sync", false, false, false},
{"sync-pollable", false, false, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name := filepath.Join(t.TempDir(), "foo")
rh := newFile(t, name, r)
wh := newFile(t, name, w)
rh := newFile(t, name, tt.overlappedRead, tt.pollable)
wh := newFile(t, name, tt.overlappedWrite, tt.pollable)
testReadWrite(t, rh, wh)
testPreadPwrite(t, rh, wh)
})
}
t.Run("overlapped", func(t *testing.T) {
test(t, true, true)
})
t.Run("overlapped-read", func(t *testing.T) {
test(t, true, false)
})
t.Run("overlapped-write", func(t *testing.T) {
test(t, false, true)
})
t.Run("sync", func(t *testing.T) {
test(t, false, false)
})
}
func TestPipe(t *testing.T) {
t.Parallel()
t.Run("overlapped", func(t *testing.T) {
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
pollable bool
}{
{"overlapped", true, true, true},
{"overlapped-nonpollable", true, true, false},
{"overlapped-write", false, true, true},
{"overlapped-read", true, false, true},
{"sync", false, false, false},
{"sync-pollable", false, false, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, true, false)
file := newFile(t, name, true)
name := pipeName()
pipe := newBytePipe(t, name, tt.overlappedWrite, tt.pollable)
file := newFile(t, name, tt.overlappedRead, tt.pollable)
testReadWrite(t, pipe, file)
})
t.Run("overlapped-write", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, true, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
t.Run("overlapped-read", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, false)
file := newFile(t, name, true)
testReadWrite(t, file, pipe)
})
t.Run("sync", func(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
}
t.Run("anonymous", func(t *testing.T) {
t.Parallel()
var r, w syscall.Handle
@ -404,16 +424,17 @@ func TestPipe(t *testing.T) {
}
}()
// CreatePipe always returns sync handles.
fdr := newFD(t, r, "pipe", false)
fdw := newFD(t, w, "file", false)
fdr := newFD(t, r, "pipe", false, false)
fdw := newFD(t, w, "file", false, false)
testReadWrite(t, fdr, fdw)
})
}
func TestPipeWriteEOF(t *testing.T) {
t.Parallel()
name, pipe := newPipe(t, false, true)
file := newFile(t, name, false)
name := pipeName()
pipe := newMessagePipe(t, name, false, true)
file := newFile(t, name, false, true)
read := make(chan struct{}, 1)
go func() {
_, err := pipe.Write(nil)
@ -435,8 +456,9 @@ func TestPipeWriteEOF(t *testing.T) {
func TestPipeCanceled(t *testing.T) {
t.Parallel()
name, _ := newPipe(t, true, false)
file := newFile(t, name, true)
name := pipeName()
_ = newBytePipe(t, name, true, true)
file := newFile(t, name, true, true)
ch := make(chan struct{}, 1)
go func() {
for {
@ -481,7 +503,7 @@ func benchmarkRead(b *testing.B, overlapped bool) {
if err != nil {
b.Fatal(err)
}
file := newFile(b, name, overlapped)
file := newFile(b, name, overlapped, true)
var buf [len(content)]byte
for b.Loop() {
_, err := io.ReadFull(file, buf[:])

View File

@ -542,9 +542,19 @@ const (
STATUS_REPARSE_POINT_ENCOUNTERED NTStatus = 0xC000050B
)
const (
FileModeInformation = 16
)
// https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/ntifs/ns-ntifs-_file_mode_information
type FILE_MODE_INFORMATION struct {
Mode uint32
}
// NT Native APIs
//sys NtCreateFile(handle *syscall.Handle, access uint32, oa *OBJECT_ATTRIBUTES, iosb *IO_STATUS_BLOCK, allocationSize *int64, attributes uint32, share uint32, disposition uint32, options uint32, eabuffer uintptr, ealength uint32) (ntstatus error) = ntdll.NtCreateFile
//sys NtOpenFile(handle *syscall.Handle, access uint32, oa *OBJECT_ATTRIBUTES, iosb *IO_STATUS_BLOCK, share uint32, options uint32) (ntstatus error) = ntdll.NtOpenFile
//sys rtlNtStatusToDosErrorNoTeb(ntstatus NTStatus) (ret syscall.Errno) = ntdll.RtlNtStatusToDosErrorNoTeb
//sys NtSetInformationFile(handle syscall.Handle, iosb *IO_STATUS_BLOCK, inBuffer uintptr, inBufferLen uint32, class uint32) (ntstatus error) = ntdll.NtSetInformationFile
//sys RtlIsDosDeviceName_U(name *uint16) (ret uint32) = ntdll.RtlIsDosDeviceName_U
//sys NtQueryInformationFile(handle syscall.Handle, iosb *IO_STATUS_BLOCK, inBuffer uintptr, inBufferLen uint32, class uint32) (ntstatus error) = ntdll.NtQueryInformationFile

View File

@ -96,6 +96,7 @@ var (
procNetUserGetLocalGroups = modnetapi32.NewProc("NetUserGetLocalGroups")
procNtCreateFile = modntdll.NewProc("NtCreateFile")
procNtOpenFile = modntdll.NewProc("NtOpenFile")
procNtQueryInformationFile = modntdll.NewProc("NtQueryInformationFile")
procNtSetInformationFile = modntdll.NewProc("NtSetInformationFile")
procRtlGetVersion = modntdll.NewProc("RtlGetVersion")
procRtlIsDosDeviceName_U = modntdll.NewProc("RtlIsDosDeviceName_U")
@ -511,6 +512,14 @@ func NtOpenFile(handle *syscall.Handle, access uint32, oa *OBJECT_ATTRIBUTES, io
return
}
func NtQueryInformationFile(handle syscall.Handle, iosb *IO_STATUS_BLOCK, inBuffer uintptr, inBufferLen uint32, class uint32) (ntstatus error) {
r0, _, _ := syscall.Syscall6(procNtQueryInformationFile.Addr(), 5, uintptr(handle), uintptr(unsafe.Pointer(iosb)), uintptr(inBuffer), uintptr(inBufferLen), uintptr(class), 0)
if r0 != 0 {
ntstatus = NTStatus(r0)
}
return
}
func NtSetInformationFile(handle syscall.Handle, iosb *IO_STATUS_BLOCK, inBuffer uintptr, inBufferLen uint32, class uint32) (ntstatus error) {
r0, _, _ := syscall.Syscall6(procNtSetInformationFile.Addr(), 5, uintptr(handle), uintptr(unsafe.Pointer(iosb)), uintptr(inBuffer), uintptr(inBufferLen), uintptr(class), 0)
if r0 != 0 {

View File

@ -5,6 +5,7 @@
package os_test
import (
"bytes"
"errors"
"fmt"
"internal/godebug"
@ -19,7 +20,10 @@ import (
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"unicode/utf16"
@ -1563,3 +1567,123 @@ func TestReadDirNoFileID(t *testing.T) {
t.Errorf("SameFile(%v, %v) = false; want true", f2, f2s)
}
}
func TestReadWriteFileOverlapped(t *testing.T) {
// See https://go.dev/issue/15388.
t.Parallel()
name := filepath.Join(t.TempDir(), "test.txt")
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
h, err := syscall.CreateFile(wname, syscall.GENERIC_ALL, 0, nil, syscall.CREATE_NEW, syscall.FILE_ATTRIBUTE_NORMAL|syscall.FILE_FLAG_OVERLAPPED, 0)
if err != nil {
t.Fatal(err)
}
f := os.NewFile(uintptr(h), name)
defer f.Close()
data := []byte("test")
n, err := f.Write(data)
if err != nil {
t.Fatal(err)
}
if n != len(data) {
t.Fatalf("Write = %d; want %d", n, len(data))
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
t.Fatal(err)
}
got, err := io.ReadAll(f)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got, data) {
t.Fatalf("Read = %q; want %q", got, data)
}
}
var currentProces = sync.OnceValue(func() string {
// Convert the process ID to a string.
return strconv.FormatUint(uint64(os.Getpid()), 10)
})
var pipeCounter atomic.Uint64
func pipeName() string {
return `\\.\pipe\go-os-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
func createPipe(t *testing.T, name string, inherit bool) *os.File {
t.Helper()
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
flags := windows.PIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
typ := windows.PIPE_TYPE_BYTE
sa := &syscall.SecurityAttributes{
Length: uint32(unsafe.Sizeof(syscall.SecurityAttributes{})),
}
if inherit {
sa.InheritHandle = 1
}
rh, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, sa)
if err != nil {
t.Fatal(err)
}
return os.NewFile(uintptr(rh), name)
}
func TestStdinOverlappedPipe(t *testing.T) {
// Test that we can read from a named pipe open with FILE_FLAG_OVERLAPPED.
// See https://go.dev/issue/15388.
if os.Getenv("GO_WANT_HELPER_PROCESS") == "1" {
var buf string
_, err := fmt.Scanln(&buf)
if err != nil {
fmt.Print(err)
os.Exit(1)
}
fmt.Println(buf)
os.Exit(0)
}
t.Parallel()
name := pipeName()
// Create the read handle inherited by the child process.
r := createPipe(t, name, true)
defer r.Close()
// Create a write handle.
w, err := os.OpenFile(name, os.O_WRONLY, 0666)
if err != nil {
t.Fatal(err)
}
defer w.Close()
// Write some data to the pipe. The child process will read it.
want := []byte("test\n")
if _, err := w.Write(want); err != nil {
t.Fatal(err)
}
// Create a child process that will read from the pipe
// and write the data to stdout.
cmd := testenv.Command(t, testenv.Executable(t), fmt.Sprintf("-test.run=^%s$", t.Name()), "-test.v")
cmd = testenv.CleanCmdEnv(cmd)
cmd.Env = append(cmd.Env, "GO_WANT_HELPER_PROCESS=1")
cmd.Stdin = r
got, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("running %q failed: %v\n%s", cmd, err, got)
}
if !bytes.Contains(got, want) {
t.Fatalf("output %q does not contain %q", got, want)
}
}