diff --git a/src/runtime/mcache.go b/src/runtime/mcache.go index 5564e4a47d..e27a1c9ec0 100644 --- a/src/runtime/mcache.go +++ b/src/runtime/mcache.go @@ -61,6 +61,10 @@ type mcache struct { // in this mcache are stale and need to the flushed so they // can be swept. This is done in acquirep. flushGen uint32 + + // statsSeq is a counter indicating whether this P is currently + // writing any stats. Its value is even when not, odd when it is. + statsSeq uint32 } // A gclink is a node in a linked list of blocks, like mlink, diff --git a/src/runtime/mgcscavenge.go b/src/runtime/mgcscavenge.go index 8b1a0be353..5843ada981 100644 --- a/src/runtime/mgcscavenge.go +++ b/src/runtime/mgcscavenge.go @@ -711,7 +711,16 @@ func (p *pageAlloc) scavengeRangeLocked(ci chunkIdx, base, npages uint) uintptr // Update global accounting only when not in test, otherwise // the runtime's accounting will be wrong. - atomic.Xadd64(&memstats.heap_released, int64(npages)*pageSize) + nbytes := int64(npages) * pageSize + atomic.Xadd64(&memstats.heap_released, nbytes) + + // Update consistent accounting too. + c := getMCache() + stats := memstats.heapStats.acquire(c) + atomic.Xaddint64(&stats.committed, -nbytes) + atomic.Xaddint64(&stats.released, nbytes) + memstats.heapStats.release(c) + return addr } diff --git a/src/runtime/mheap.go b/src/runtime/mheap.go index 87d2fd495b..d17b6fa284 100644 --- a/src/runtime/mheap.go +++ b/src/runtime/mheap.go @@ -1239,6 +1239,22 @@ HaveSpan: // Manually managed memory doesn't count toward heap_sys. memstats.heap_sys.add(-int64(nbytes)) } + // Update consistent stats. + c := getMCache() + stats := memstats.heapStats.acquire(c) + atomic.Xaddint64(&stats.committed, int64(scav)) + atomic.Xaddint64(&stats.released, -int64(scav)) + switch typ { + case spanAllocHeap: + atomic.Xaddint64(&stats.inHeap, int64(nbytes)) + case spanAllocStack: + atomic.Xaddint64(&stats.inStacks, int64(nbytes)) + case spanAllocPtrScalarBits: + atomic.Xaddint64(&stats.inPtrScalarBits, int64(nbytes)) + case spanAllocWorkBuf: + atomic.Xaddint64(&stats.inWorkBufs, int64(nbytes)) + } + memstats.heapStats.release(c) // Publish the span in various locations. @@ -1316,6 +1332,10 @@ func (h *mheap) grow(npage uintptr) bool { // size which is always > physPageSize, so its safe to // just add directly to heap_released. atomic.Xadd64(&memstats.heap_released, int64(asize)) + c := getMCache() + stats := memstats.heapStats.acquire(c) + atomic.Xaddint64(&stats.released, int64(asize)) + memstats.heapStats.release(c) // Recalculate nBase. // We know this won't overflow, because sysAlloc returned @@ -1415,6 +1435,20 @@ func (h *mheap) freeSpanLocked(s *mspan, typ spanAllocType) { // Manually managed memory doesn't count toward heap_sys, so add it back. memstats.heap_sys.add(int64(nbytes)) } + // Update consistent stats. + c := getMCache() + stats := memstats.heapStats.acquire(c) + switch typ { + case spanAllocHeap: + atomic.Xaddint64(&stats.inHeap, -int64(nbytes)) + case spanAllocStack: + atomic.Xaddint64(&stats.inStacks, -int64(nbytes)) + case spanAllocPtrScalarBits: + atomic.Xaddint64(&stats.inPtrScalarBits, -int64(nbytes)) + case spanAllocWorkBuf: + atomic.Xaddint64(&stats.inWorkBufs, -int64(nbytes)) + } + memstats.heapStats.release(c) // Mark the space as free. h.pages.free(s.base(), s.npages) diff --git a/src/runtime/mstats.go b/src/runtime/mstats.go index a6e38d1c1b..76546c0f0c 100644 --- a/src/runtime/mstats.go +++ b/src/runtime/mstats.go @@ -148,6 +148,9 @@ type mstats struct { // unlike heap_live, heap_marked does not change until the // next mark termination. heap_marked uint64 + + // heapStats is a set of statistics + heapStats consistentHeapStats } var memstats mstats @@ -426,10 +429,20 @@ type MemStats struct { } func init() { - if unsafe.Offsetof(memstats.heap_live)%8 != 0 { - println(unsafe.Offsetof(memstats.heap_live)) + if offset := unsafe.Offsetof(memstats.heap_live); offset%8 != 0 { + println(offset) throw("memstats.heap_live not aligned to 8 bytes") } + if offset := unsafe.Offsetof(memstats.heapStats); offset%8 != 0 { + println(offset) + throw("memstats.heapStats not aligned to 8 bytes") + } + // Ensure the size of heapStatsDelta causes adjacent fields/slots (e.g. + // [3]heapStatsDelta) to be 8-byte aligned. + if size := unsafe.Sizeof(heapStatsDelta{}); size%8 != 0 { + println(size) + throw("heapStatsDelta not a multiple of 8 bytes in size") + } } // ReadMemStats populates m with memory allocator statistics. @@ -687,3 +700,170 @@ func (s *sysMemStat) add(n int64) { throw("sysMemStat overflow") } } + +// heapStatsDelta contains deltas of various runtime memory statistics +// that need to be updated together in order for them to be kept +// consistent with one another. +type heapStatsDelta struct { + committed int64 // byte delta of memory committed + released int64 // byte delta of released memory generated + inHeap int64 // byte delta of memory placed in the heap + inStacks int64 // byte delta of memory reserved for stacks + inWorkBufs int64 // byte delta of memory reserved for work bufs + inPtrScalarBits int64 // byte delta of memory reserved for unrolled GC prog bits +} + +// merge adds in the deltas from b into a. +func (a *heapStatsDelta) merge(b *heapStatsDelta) { + a.committed += b.committed + a.released += b.released + a.inHeap += b.inHeap + a.inStacks += b.inStacks + a.inWorkBufs += b.inWorkBufs + a.inPtrScalarBits += b.inPtrScalarBits +} + +// consistentHeapStats represents a set of various memory statistics +// whose updates must be viewed completely to get a consistent +// state of the world. +// +// To write updates to memory stats use the acquire and release +// methods. To obtain a consistent global snapshot of these statistics, +// use read. +type consistentHeapStats struct { + // stats is a ring buffer of heapStatsDelta values. + // Writers always atomically update the delta at index gen. + // + // Readers operate by rotating gen (0 -> 1 -> 2 -> 0 -> ...) + // and synchronizing with writers by observing each mcache's + // statsSeq field. If the reader observes a P (to which the + // mcache is bound) not writing, it can be sure that it will + // pick up the new gen value the next time it writes. + // The reader then takes responsibility by clearing space + // in the ring buffer for the next reader to rotate gen to + // that space (i.e. it merges in values from index (gen-2) mod 3 + // to index (gen-1) mod 3, then clears the former). + // + // Note that this means only one reader can be reading at a time. + // There is no way for readers to synchronize. + // + // This process is why we need ring buffer of size 3 instead + // of 2: one is for the writers, one contains the most recent + // data, and the last one is clear so writers can begin writing + // to it the moment gen is updated. + stats [3]heapStatsDelta + + // gen represents the current index into which writers + // are writing, and can take on the value of 0, 1, or 2. + // This value is updated atomically. + gen uint32 +} + +// acquire returns a heapStatsDelta to be updated. In effect, +// it acquires the shard for writing. release must be called +// as soon as the relevant deltas are updated. c must be +// a valid mcache not being used by any other thread. +// +// The returned heapStatsDelta must be updated atomically. +// +// Note however, that this is unsafe to call concurrently +// with other writers and there must be only one writer +// at a time. +func (m *consistentHeapStats) acquire(c *mcache) *heapStatsDelta { + seq := atomic.Xadd(&c.statsSeq, 1) + if seq%2 == 0 { + // Should have been incremented to odd. + print("runtime: seq=", seq, "\n") + throw("bad sequence number") + } + gen := atomic.Load(&m.gen) % 3 + return &m.stats[gen] +} + +// release indicates that the writer is done modifying +// the delta. The value returned by the corresponding +// acquire must no longer be accessed or modified after +// release is called. +// +// The mcache passed here must be the same as the one +// passed to acquire. +func (m *consistentHeapStats) release(c *mcache) { + seq := atomic.Xadd(&c.statsSeq, 1) + if seq%2 != 0 { + // Should have been incremented to even. + print("runtime: seq=", seq, "\n") + throw("bad sequence number") + } +} + +// unsafeRead aggregates the delta for this shard into out. +// +// Unsafe because it does so without any synchronization. The +// only safe time to call this is if the world is stopped or +// we're freezing the world or going down anyway (and we just +// want _some_ estimate). +func (m *consistentHeapStats) unsafeRead(out *heapStatsDelta) { + for i := range m.stats { + out.merge(&m.stats[i]) + } +} + +// unsafeClear clears the shard. +// +// Unsafe because the world must be stopped and values should +// be donated elsewhere before clearing. +func (m *consistentHeapStats) unsafeClear() { + for i := range m.stats { + m.stats[i] = heapStatsDelta{} + } +} + +// read takes a globally consistent snapshot of m +// and puts the aggregated value in out. Even though out is a +// heapStatsDelta, the resulting values should be complete and +// valid statistic values. +// +// Not safe to call concurrently. +func (m *consistentHeapStats) read(out *heapStatsDelta) { + // Getting preempted after this point is not safe because + // we read allp. We need to make sure a STW can't happen + // so it doesn't change out from under us. + mp := acquirem() + + // Rotate gen, effectively taking a snapshot of the state of + // these statistics at the point of the exchange by moving + // writers to the next set of deltas. + // + // This exchange is safe to do because we won't race + // with anyone else trying to update this value. + currGen := atomic.Load(&m.gen) + atomic.Xchg(&m.gen, (currGen+1)%3) + prevGen := currGen - 1 + if currGen == 0 { + prevGen = 2 + } + for _, p := range allp { + c := p.mcache + if c == nil { + continue + } + // Spin until there are no more writers. + for atomic.Load(&c.statsSeq)%2 != 0 { + } + } + + // At this point we've observed that each sequence + // number is even, so any future writers will observe + // the new gen value. That means it's safe to read from + // the other deltas in the stats buffer. + + // Perform our responsibilities and free up + // stats[prevGen] for the next time we want to take + // a snapshot. + m.stats[currGen].merge(&m.stats[prevGen]) + m.stats[prevGen] = heapStatsDelta{} + + // Finally, copy out the complete delta. + *out = m.stats[currGen] + releasem(mp) +}