os,internal/poll: support I/O on overlapped files not added to the poller

This fixes the support for I/O on overlapped files that are not added to
the poller. Note that CL 661795 already added support for that, but it
really only worked for pipes, not for plain files.

Additionally, this CL also makes this kind of I/O operations to not
notify the external poller to avoid confusing it.

Updates #15388.

Change-Id: I15c6ea74f3a87960aef0986598077b6eab9b9c99
Reviewed-on: https://go-review.googlesource.com/c/go/+/664415
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
Reviewed-by: Alex Brainman <alex.brainman@gmail.com>
Reviewed-by: Damien Neil <dneil@google.com>
Auto-Submit: Quim Muntal <quimmuntal@gmail.com>
This commit is contained in:
qmuntal 2025-04-10 16:03:46 +02:00 committed by Gopher Robot
parent 13b7c7d8d2
commit f414dfe4f5
2 changed files with 106 additions and 28 deletions

View File

@ -68,7 +68,7 @@ var InitWSA = sync.OnceFunc(func() {
// operation contains superset of data necessary to perform all async IO.
type operation struct {
// Used by IOCP interface, it must be first field
// of the struct, as our code rely on it.
// of the struct, as our code relies on it.
o syscall.Overlapped
// fields used by runtime.netpoll
@ -88,6 +88,16 @@ type operation struct {
bufs []syscall.WSABuf
}
func (o *operation) setEvent() {
h, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
// This shouldn't happen when all CreateEvent arguments are zero.
panic(err)
}
// Set the low bit so that the external IOCP doesn't receive the completion packet.
o.o.HEvent = h | 1
}
func (o *operation) overlapped() *syscall.Overlapped {
if o.fd.isBlocking {
// Don't return the overlapped object if the file handle
@ -155,11 +165,15 @@ func (o *operation) InitMsg(p []byte, oob []byte) {
// waitIO waits for the IO operation o to complete.
func waitIO(o *operation) error {
if o.fd.isBlocking {
panic("can't wait on blocking operations")
}
fd := o.fd
if !fd.pd.pollable() {
// The overlapped handle is not added to the runtime poller,
// the only way to wait for the IO to complete is block.
_, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
// the only way to wait for the IO to complete is block until
// the overlapped event is signaled.
_, err := syscall.WaitForSingleObject(o.o.HEvent, syscall.INFINITE)
return err
}
// Wait for our request to complete.
@ -202,11 +216,19 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) {
return 0, err
}
// Start IO.
if !fd.isBlocking && o.o.HEvent == 0 && !fd.pd.pollable() {
// If the handle is opened for overlapped IO but we can't
// use the runtime poller, then we need to use an
// event to wait for the IO to complete.
o.setEvent()
}
o.qty = 0
o.flags = 0
err = submit(o)
var waitErr error
if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) {
// Blocking operations shouldn't return ERROR_IO_PENDING.
// Continue without waiting if that happens.
if !o.fd.isBlocking && (err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif)) {
// IO started asynchronously or completed synchronously but
// a sync notification is required. Wait for it to complete.
waitErr = waitIO(o)
@ -345,11 +367,6 @@ func (fd *FD) initIO() error {
// so it is safe to add handles owned by the caller.
fd.initIOErr = fd.pd.init(fd)
if fd.initIOErr != nil {
// This can happen if the handle is already associated
// with another IOCP or if the isBlocking flag is incorrect.
// In both cases, fallback to synchronous IO.
fd.isBlocking = true
fd.skipSyncNotif = true
return
}
fd.rop.runtimeCtx = fd.pd.runtimeCtx
@ -389,7 +406,6 @@ func (fd *FD) Init(net string, pollable bool) error {
}
fd.isFile = fd.kind != kindNet
fd.isBlocking = !pollable
fd.skipSyncNotif = fd.isBlocking
fd.rop.mode = 'r'
fd.wop.mode = 'w'
fd.rop.fd = fd

View File

@ -1984,31 +1984,93 @@ func TestPipeCanceled(t *testing.T) {
}
}
func TestPipeExternalIOCP(t *testing.T) {
func iocpAssociateFile(f *os.File, iocp syscall.Handle) error {
sc, err := f.SyscallConn()
if err != nil {
return err
}
var syserr error
err = sc.Control(func(fd uintptr) {
if _, err = windows.CreateIoCompletionPort(syscall.Handle(fd), iocp, 0, 0); err != nil {
syserr = err
}
})
if err == nil {
err = syserr
}
return err
}
func TestFileAssociatedWithExternalIOCP(t *testing.T) {
// Test that a caller can associate an overlapped handle to an external IOCP
// even when the handle is also associated to a poll.FD. Also test that
// the FD can still perform I/O after the association.
// after the handle has been passed to os.NewFile.
// Also test that the File can perform I/O after it is associated with the
// external IOCP and that those operations do not post to the external IOCP.
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true)
_ = newFileOverlapped(t, name, true) // Just open a pipe client
_ = newFileOverlapped(t, name, true) // just open a pipe client
sc, err := pipe.SyscallConn()
if err != nil {
t.Error(err)
return
}
if err := sc.Control(func(fd uintptr) {
_, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1)
if err != nil {
t.Fatal(err)
}
}); err != nil {
t.Error(err)
}
// Use a file to exercise WriteAt.
file := newFileOverlapped(t, filepath.Join(t.TempDir(), "a"), true)
_, err = pipe.Write([]byte("hello"))
iocp, err := windows.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
defer func() {
if iocp == syscall.InvalidHandle {
// Already closed at the end of the test.
return
}
if err := syscall.CloseHandle(iocp); err != nil {
t.Fatal(err)
}
}()
ch := make(chan error, 1)
go func() {
var bytes, key uint32
var overlapped *syscall.Overlapped
err := syscall.GetQueuedCompletionStatus(syscall.Handle(iocp), &bytes, &key, &overlapped, syscall.INFINITE)
ch <- err
}()
if err := iocpAssociateFile(pipe, iocp); err != nil {
t.Fatal(err)
}
if err := iocpAssociateFile(file, iocp); err != nil {
t.Fatal(err)
}
if _, err := pipe.Write([]byte("hello")); err != nil {
t.Fatal(err)
}
if _, err := file.Write([]byte("hello")); err != nil {
t.Fatal(err)
}
if _, err := file.WriteAt([]byte("hello"), 0); err != nil {
t.Fatal(err)
}
// Wait fot he goroutine to call GetQueuedCompletionStatus.
time.Sleep(100 * time.Millisecond)
// Trigger ERROR_ABANDONED_WAIT_0.
if err := syscall.CloseHandle(iocp); err != nil {
t.Fatal(err)
}
// Wait for the completion to be posted to the IOCP.
err = <-ch
iocp = syscall.InvalidHandle
const ERROR_ABANDONED_WAIT_0 = syscall.Errno(735)
switch err {
case ERROR_ABANDONED_WAIT_0:
// This is what we expect.
case nil:
t.Error("unexpected queued completion")
default:
t.Error(err)
}
}