diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index 14b1febbf4..81c8293911 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -434,6 +434,10 @@ func (fd *FD) Read(buf []byte) (int, error) { return 0, err } defer fd.readUnlock() + if fd.isFile { + fd.l.Lock() + defer fd.l.Unlock() + } if len(buf) > maxRW { buf = buf[:maxRW] @@ -441,36 +445,29 @@ func (fd *FD) Read(buf []byte) (int, error) { var n int var err error - if fd.isFile { - fd.l.Lock() - defer fd.l.Unlock() - switch fd.kind { - case kindConsole: - n, err = fd.readConsole(buf) - default: - o := &fd.rop - o.InitBuf(buf) - n, err = execIO(o, func(o *operation) error { - return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) - }) - fd.addOffset(n) - if fd.kind == kindPipe && err != nil { - switch err { - case syscall.ERROR_BROKEN_PIPE: - // Returned by pipes when the other end is closed. - err = nil - case syscall.ERROR_OPERATION_ABORTED: - // Close uses CancelIoEx to interrupt concurrent I/O for pipes. - // If the fd is a pipe and the Read was interrupted by CancelIoEx, - // we assume it is interrupted by Close. - err = ErrFileClosing - } + switch fd.kind { + case kindConsole: + n, err = fd.readConsole(buf) + case kindFile, kindPipe: + o := &fd.rop + o.InitBuf(buf) + n, err = execIO(o, func(o *operation) error { + return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) + }) + fd.addOffset(n) + if fd.kind == kindPipe && err != nil { + switch err { + case syscall.ERROR_BROKEN_PIPE: + // Returned by pipes when the other end is closed. + err = nil + case syscall.ERROR_OPERATION_ABORTED: + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Read was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = ErrFileClosing } } - if err != nil { - n = 0 - } - } else { + case kindNet: o := &fd.rop o.InitBuf(buf) n, err = execIO(o, func(o *operation) error { @@ -701,36 +698,32 @@ func (fd *FD) Write(buf []byte) (int, error) { defer fd.l.Unlock() } - ntotal := 0 - for len(buf) > 0 { - b := buf - if len(b) > maxRW { - b = b[:maxRW] + var ntotal int + for { + max := len(buf) + if max-ntotal > maxRW { + max = ntotal + maxRW } + b := buf[ntotal:max] var n int var err error - if fd.isFile { - switch fd.kind { - case kindConsole: - n, err = fd.writeConsole(b) - default: - o := &fd.wop - o.InitBuf(b) - n, err = execIO(o, func(o *operation) error { - return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) - }) - fd.addOffset(n) - if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { - // Close uses CancelIoEx to interrupt concurrent I/O for pipes. - // If the fd is a pipe and the Write was interrupted by CancelIoEx, - // we assume it is interrupted by Close. - err = ErrFileClosing - } + switch fd.kind { + case kindConsole: + n, err = fd.writeConsole(b) + case kindPipe, kindFile: + o := &fd.wop + o.InitBuf(b) + n, err = execIO(o, func(o *operation) error { + return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) + }) + fd.addOffset(n) + if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED { + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Write was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = ErrFileClosing } - if err != nil { - n = 0 - } - } else { + case kindNet: if race.Enabled { race.ReleaseMerge(unsafe.Pointer(&ioSync)) } @@ -741,12 +734,13 @@ func (fd *FD) Write(buf []byte) (int, error) { }) } ntotal += n - if err != nil { + if ntotal == len(buf) || err != nil { return ntotal, err } - buf = buf[n:] + if n == 0 { + return ntotal, io.ErrUnexpectedEOF + } } - return ntotal, nil } // writeConsole writes len(b) bytes to the console File. @@ -814,26 +808,29 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) { defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart) defer fd.setOffset(curoffset) - ntotal := 0 - for len(buf) > 0 { - b := buf - if len(b) > maxRW { - b = b[:maxRW] + var ntotal int + for { + max := len(buf) + if max-ntotal > maxRW { + max = ntotal + maxRW } + b := buf[ntotal:max] o := &fd.wop o.InitBuf(b) - fd.setOffset(off) + fd.setOffset(off + int64(ntotal)) n, err := execIO(o, func(o *operation) error { return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o) }) - ntotal += int(n) - if err != nil { + if n > 0 { + ntotal += n + } + if ntotal == len(buf) || err != nil { return ntotal, err } - buf = buf[n:] - off += int64(n) + if n == 0 { + return ntotal, io.ErrUnexpectedEOF + } } - return ntotal, nil } // Writev emulates the Unix writev system call. diff --git a/src/internal/poll/fd_windows_test.go b/src/internal/poll/fd_windows_test.go index f2bc9b2a21..f5fa4a26e3 100644 --- a/src/internal/poll/fd_windows_test.go +++ b/src/internal/poll/fd_windows_test.go @@ -239,7 +239,7 @@ var currentProces = sync.OnceValue(func() string { var pipeCounter atomic.Uint64 -func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) { +func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) { name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10) wname, err := syscall.UTF16PtrFromString(name) if err != nil { @@ -250,7 +250,11 @@ func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) { if overlapped { flags |= syscall.FILE_FLAG_OVERLAPPED } - h, err := windows.CreateNamedPipe(wname, uint32(flags), windows.PIPE_TYPE_BYTE, 1, 4096, 4096, 0, nil) + typ := windows.PIPE_TYPE_BYTE + if message { + typ = windows.PIPE_TYPE_MESSAGE + } + h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil) if err != nil { t.Fatal(err) } @@ -358,22 +362,22 @@ func TestFile(t *testing.T) { func TestPipe(t *testing.T) { t.Run("overlapped", func(t *testing.T) { - name, pipe := newPipe(t, true) + name, pipe := newPipe(t, true, false) file := newFile(t, name, true) testReadWrite(t, pipe, file) }) t.Run("overlapped-write", func(t *testing.T) { - name, pipe := newPipe(t, true) + name, pipe := newPipe(t, true, false) file := newFile(t, name, false) testReadWrite(t, file, pipe) }) t.Run("overlapped-read", func(t *testing.T) { - name, pipe := newPipe(t, false) + name, pipe := newPipe(t, false, false) file := newFile(t, name, true) testReadWrite(t, file, pipe) }) t.Run("sync", func(t *testing.T) { - name, pipe := newPipe(t, false) + name, pipe := newPipe(t, false, false) file := newFile(t, name, false) testReadWrite(t, file, pipe) }) @@ -397,6 +401,28 @@ func TestPipe(t *testing.T) { }) } +func TestPipeWriteEOF(t *testing.T) { + name, pipe := newPipe(t, false, true) + file := newFile(t, name, false) + read := make(chan struct{}, 1) + go func() { + _, err := pipe.Write(nil) + read <- struct{}{} + if err != nil { + t.Error(err) + } + }() + <-read + var buf [10]byte + n, err := file.Read(buf[:]) + if err != io.EOF { + t.Errorf("expected EOF, got %v", err) + } + if n != 0 { + t.Errorf("expected 0 bytes, got %d", n) + } +} + func BenchmarkReadOverlapped(b *testing.B) { benchmarkRead(b, true) } diff --git a/src/internal/syscall/windows/syscall_windows.go b/src/internal/syscall/windows/syscall_windows.go index af542c8003..3a197f1c26 100644 --- a/src/internal/syscall/windows/syscall_windows.go +++ b/src/internal/syscall/windows/syscall_windows.go @@ -509,7 +509,8 @@ const ( PIPE_ACCESS_OUTBOUND = 0x00000002 PIPE_ACCESS_DUPLEX = 0x00000003 - PIPE_TYPE_BYTE = 0x00000000 + PIPE_TYPE_BYTE = 0x00000000 + PIPE_TYPE_MESSAGE = 0x00000004 ) //sys GetOverlappedResult(handle syscall.Handle, overlapped *syscall.Overlapped, done *uint32, wait bool) (err error)