internal/poll: don't skip empty writes on Windows

Empty writes might be important for some protocols. Let Windows decide
what do with them rather than skipping them on our side. This is inline
with the behavior of other platforms.

While here, refactor the Read/Write/Pwrite methods to reduce one
indentation level and make the code easier to read.

Fixes #73084.

Change-Id: Ic5393358e237d53b8be6097cd7359ac0ff205309
Reviewed-on: https://go-review.googlesource.com/c/go/+/661435
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
qmuntal 2025-03-28 10:39:49 +01:00 committed by Quim Muntal
parent e6c2e12c63
commit 656b5b3abe
3 changed files with 99 additions and 75 deletions

View File

@ -434,6 +434,10 @@ func (fd *FD) Read(buf []byte) (int, error) {
return 0, err return 0, err
} }
defer fd.readUnlock() defer fd.readUnlock()
if fd.isFile {
fd.l.Lock()
defer fd.l.Unlock()
}
if len(buf) > maxRW { if len(buf) > maxRW {
buf = buf[:maxRW] buf = buf[:maxRW]
@ -441,36 +445,29 @@ func (fd *FD) Read(buf []byte) (int, error) {
var n int var n int
var err error var err error
if fd.isFile { switch fd.kind {
fd.l.Lock() case kindConsole:
defer fd.l.Unlock() n, err = fd.readConsole(buf)
switch fd.kind { case kindFile, kindPipe:
case kindConsole: o := &fd.rop
n, err = fd.readConsole(buf) o.InitBuf(buf)
default: n, err = execIO(o, func(o *operation) error {
o := &fd.rop return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
o.InitBuf(buf) })
n, err = execIO(o, func(o *operation) error { fd.addOffset(n)
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) if fd.kind == kindPipe && err != nil {
}) switch err {
fd.addOffset(n) case syscall.ERROR_BROKEN_PIPE:
if fd.kind == kindPipe && err != nil { // Returned by pipes when the other end is closed.
switch err { err = nil
case syscall.ERROR_BROKEN_PIPE: case syscall.ERROR_OPERATION_ABORTED:
// Returned by pipes when the other end is closed. // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
err = nil // If the fd is a pipe and the Read was interrupted by CancelIoEx,
case syscall.ERROR_OPERATION_ABORTED: // we assume it is interrupted by Close.
// Close uses CancelIoEx to interrupt concurrent I/O for pipes. err = ErrFileClosing
// If the fd is a pipe and the Read was interrupted by CancelIoEx,
// we assume it is interrupted by Close.
err = ErrFileClosing
}
} }
} }
if err != nil { case kindNet:
n = 0
}
} else {
o := &fd.rop o := &fd.rop
o.InitBuf(buf) o.InitBuf(buf)
n, err = execIO(o, func(o *operation) error { n, err = execIO(o, func(o *operation) error {
@ -701,36 +698,32 @@ func (fd *FD) Write(buf []byte) (int, error) {
defer fd.l.Unlock() defer fd.l.Unlock()
} }
ntotal := 0 var ntotal int
for len(buf) > 0 { for {
b := buf max := len(buf)
if len(b) > maxRW { if max-ntotal > maxRW {
b = b[:maxRW] max = ntotal + maxRW
} }
b := buf[ntotal:max]
var n int var n int
var err error var err error
if fd.isFile { switch fd.kind {
switch fd.kind { case kindConsole:
case kindConsole: n, err = fd.writeConsole(b)
n, err = fd.writeConsole(b) case kindPipe, kindFile:
default: o := &fd.wop
o := &fd.wop o.InitBuf(b)
o.InitBuf(b) n, err = execIO(o, func(o *operation) error {
n, err = execIO(o, func(o *operation) error { return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) })
}) fd.addOffset(n)
fd.addOffset(n) if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// Close uses CancelIoEx to interrupt concurrent I/O for pipes. // If the fd is a pipe and the Write was interrupted by CancelIoEx,
// If the fd is a pipe and the Write was interrupted by CancelIoEx, // we assume it is interrupted by Close.
// we assume it is interrupted by Close. err = ErrFileClosing
err = ErrFileClosing
}
} }
if err != nil { case kindNet:
n = 0
}
} else {
if race.Enabled { if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync)) race.ReleaseMerge(unsafe.Pointer(&ioSync))
} }
@ -741,12 +734,13 @@ func (fd *FD) Write(buf []byte) (int, error) {
}) })
} }
ntotal += n ntotal += n
if err != nil { if ntotal == len(buf) || err != nil {
return ntotal, err return ntotal, err
} }
buf = buf[n:] if n == 0 {
return ntotal, io.ErrUnexpectedEOF
}
} }
return ntotal, nil
} }
// writeConsole writes len(b) bytes to the console File. // writeConsole writes len(b) bytes to the console File.
@ -814,26 +808,29 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart) defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart)
defer fd.setOffset(curoffset) defer fd.setOffset(curoffset)
ntotal := 0 var ntotal int
for len(buf) > 0 { for {
b := buf max := len(buf)
if len(b) > maxRW { if max-ntotal > maxRW {
b = b[:maxRW] max = ntotal + maxRW
} }
b := buf[ntotal:max]
o := &fd.wop o := &fd.wop
o.InitBuf(b) o.InitBuf(b)
fd.setOffset(off) fd.setOffset(off + int64(ntotal))
n, err := execIO(o, func(o *operation) error { n, err := execIO(o, func(o *operation) error {
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o) return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
}) })
ntotal += int(n) if n > 0 {
if err != nil { ntotal += n
}
if ntotal == len(buf) || err != nil {
return ntotal, err return ntotal, err
} }
buf = buf[n:] if n == 0 {
off += int64(n) return ntotal, io.ErrUnexpectedEOF
}
} }
return ntotal, nil
} }
// Writev emulates the Unix writev system call. // Writev emulates the Unix writev system call.

View File

@ -239,7 +239,7 @@ var currentProces = sync.OnceValue(func() string {
var pipeCounter atomic.Uint64 var pipeCounter atomic.Uint64
func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) { func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) {
name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10) name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
wname, err := syscall.UTF16PtrFromString(name) wname, err := syscall.UTF16PtrFromString(name)
if err != nil { if err != nil {
@ -250,7 +250,11 @@ func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) {
if overlapped { if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED flags |= syscall.FILE_FLAG_OVERLAPPED
} }
h, err := windows.CreateNamedPipe(wname, uint32(flags), windows.PIPE_TYPE_BYTE, 1, 4096, 4096, 0, nil) typ := windows.PIPE_TYPE_BYTE
if message {
typ = windows.PIPE_TYPE_MESSAGE
}
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -358,22 +362,22 @@ func TestFile(t *testing.T) {
func TestPipe(t *testing.T) { func TestPipe(t *testing.T) {
t.Run("overlapped", func(t *testing.T) { t.Run("overlapped", func(t *testing.T) {
name, pipe := newPipe(t, true) name, pipe := newPipe(t, true, false)
file := newFile(t, name, true) file := newFile(t, name, true)
testReadWrite(t, pipe, file) testReadWrite(t, pipe, file)
}) })
t.Run("overlapped-write", func(t *testing.T) { t.Run("overlapped-write", func(t *testing.T) {
name, pipe := newPipe(t, true) name, pipe := newPipe(t, true, false)
file := newFile(t, name, false) file := newFile(t, name, false)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
t.Run("overlapped-read", func(t *testing.T) { t.Run("overlapped-read", func(t *testing.T) {
name, pipe := newPipe(t, false) name, pipe := newPipe(t, false, false)
file := newFile(t, name, true) file := newFile(t, name, true)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
t.Run("sync", func(t *testing.T) { t.Run("sync", func(t *testing.T) {
name, pipe := newPipe(t, false) name, pipe := newPipe(t, false, false)
file := newFile(t, name, false) file := newFile(t, name, false)
testReadWrite(t, file, pipe) testReadWrite(t, file, pipe)
}) })
@ -397,6 +401,28 @@ func TestPipe(t *testing.T) {
}) })
} }
func TestPipeWriteEOF(t *testing.T) {
name, pipe := newPipe(t, false, true)
file := newFile(t, name, false)
read := make(chan struct{}, 1)
go func() {
_, err := pipe.Write(nil)
read <- struct{}{}
if err != nil {
t.Error(err)
}
}()
<-read
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func BenchmarkReadOverlapped(b *testing.B) { func BenchmarkReadOverlapped(b *testing.B) {
benchmarkRead(b, true) benchmarkRead(b, true)
} }

View File

@ -509,7 +509,8 @@ const (
PIPE_ACCESS_OUTBOUND = 0x00000002 PIPE_ACCESS_OUTBOUND = 0x00000002
PIPE_ACCESS_DUPLEX = 0x00000003 PIPE_ACCESS_DUPLEX = 0x00000003
PIPE_TYPE_BYTE = 0x00000000 PIPE_TYPE_BYTE = 0x00000000
PIPE_TYPE_MESSAGE = 0x00000004
) )
//sys GetOverlappedResult(handle syscall.Handle, overlapped *syscall.Overlapped, done *uint32, wait bool) (err error) //sys GetOverlappedResult(handle syscall.Handle, overlapped *syscall.Overlapped, done *uint32, wait bool) (err error)