pool: fix deadlock with --max-buffer-memory

Before this change we used an overcomplicated method of memory
reservations in the pool.RW which caused deadlocks.

This changes it to use a much simpler reservation system where we
actually reserve the memory and store it in the pool.RW. This allows
us to use the semaphore.Weighted to count the actually memory in use
(rather than the memory in use and in the cache). This in turn allows
accurate use of the semaphore by users wanting memory.
This commit is contained in:
Nick Craig-Wood
2025-08-12 12:45:10 +01:00
parent 3f60764bd4
commit 0c0fb93111
3 changed files with 170 additions and 108 deletions

View File

@@ -5,6 +5,7 @@ package pool
import (
"context"
"fmt"
"slices"
"sync"
"time"
@@ -46,7 +47,8 @@ type Pool struct {
// totalMemory is a semaphore used to control total buffer usage of
// all Pools. It it may be nil in which case the total buffer usage
// will not be controlled.
// will not be controlled. It counts memory in active use, it does not
// count memory cached in the pool.
var totalMemory *semaphore.Weighted
// Make sure we initialise the totalMemory semaphore once
@@ -102,6 +104,18 @@ func (bp *Pool) get() []byte {
return buf
}
// getN gets the last n buffers in bp.cache
//
// will panic if you ask for too many buffers
//
// Call with mu held
func (bp *Pool) getN(n int) [][]byte {
i := len(bp.cache) - n
bufs := slices.Clone(bp.cache[i:])
bp.cache = slices.Delete(bp.cache, i, len(bp.cache))
return bufs
}
// put puts the buffer on the end of bp.cache
//
// Call with mu held
@@ -109,6 +123,20 @@ func (bp *Pool) put(buf []byte) {
bp.cache = append(bp.cache, buf)
}
// put puts the bufs on the end of bp.cache
//
// Call with mu held
func (bp *Pool) putN(bufs [][]byte) {
bp.cache = append(bp.cache, bufs...)
}
// buffers returns the number of buffers in bp.ache
//
// Call with mu held
func (bp *Pool) buffers() int {
return len(bp.cache)
}
// flush n entries from the entire buffer pool
// Call with mu held
func (bp *Pool) flush(n int) {
@@ -175,7 +203,7 @@ func (bp *Pool) updateMinFill() {
}
}
// acquire mem bytes of memory
// acquire mem bytes of memory for the user
func (bp *Pool) acquire(mem int64) error {
if totalMemory == nil {
return nil
@@ -184,7 +212,7 @@ func (bp *Pool) acquire(mem int64) error {
return totalMemory.Acquire(ctx, mem)
}
// release mem bytes of memory
// release mem bytes of memory from the user
func (bp *Pool) release(mem int64) {
if totalMemory == nil {
return
@@ -192,84 +220,62 @@ func (bp *Pool) release(mem int64) {
totalMemory.Release(mem)
}
// Reserve buffers for use. Blocks until they are free.
//
// Doesn't allocate any memory.
//
// Must be released by calling GetReserved() which releases 1 buffer or
// Release() to release any number of buffers.
func (bp *Pool) Reserve(buffers int) {
waitTime := time.Millisecond
for {
err := bp.acquire(int64(buffers) * int64(bp.bufferSize))
if err == nil {
break
}
fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err)
time.Sleep(waitTime)
waitTime *= 2
}
}
// Release previously Reserved buffers.
//
// Doesn't free any memory.
func (bp *Pool) Release(buffers int) {
bp.release(int64(buffers) * int64(bp.bufferSize))
}
// Get a buffer from the pool or allocate one
func (bp *Pool) getBlock(reserved bool) []byte {
func (bp *Pool) Get() []byte {
return bp.GetN(1)[0]
}
// GetN get n buffers atomically from the pool or allocate them
func (bp *Pool) GetN(n int) [][]byte {
bp.mu.Lock()
var buf []byte
waitTime := time.Millisecond
var (
waitTime = time.Millisecond // retry time if allocation failed
err error // allocation error
buf []byte // allocated buffer
bufs [][]byte // bufs so far
have int // have this many buffers in bp.cache
want int // want this many extra buffers
acquired bool // whether we have acquired the memory or not
)
for {
if len(bp.cache) > 0 {
buf = bp.get()
if reserved {
// If got reserved memory from the cache we
// can release the reservation of one buffer.
bp.release(int64(bp.bufferSize))
}
break
} else {
var err error
if !reserved {
acquired = false
bp.mu.Unlock()
err = bp.acquire(int64(bp.bufferSize))
err = bp.acquire(int64(bp.bufferSize) * int64(n))
bp.mu.Lock()
if err != nil {
goto FAIL
}
if err == nil {
acquired = true
have = min(bp.buffers(), n)
want = n - have
bufs = bp.getN(have) // get as many buffers as we have from the cache
for range want {
buf, err = bp.alloc(bp.bufferSize)
if err == nil {
if err != nil {
goto FAIL
}
bp.alloced++
bufs = append(bufs, buf)
}
break
}
if !reserved {
bp.release(int64(bp.bufferSize))
}
FAIL:
// Release the buffers and the allocation if it succeeded
bp.putN(bufs)
if acquired {
bp.release(int64(bp.bufferSize) * int64(n))
}
fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
bp.mu.Unlock()
time.Sleep(waitTime)
bp.mu.Lock()
waitTime *= 2
clear(bufs)
bufs = nil
}
}
bp.inUse++
bp.inUse += n
bp.updateMinFill()
bp.mu.Unlock()
return buf
}
// Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte {
return bp.getBlock(false)
}
// GetReserved gets a reserved buffer from the pool or allocates one.
func (bp *Pool) GetReserved() []byte {
return bp.getBlock(true)
return bufs
}
// freeBuffer returns mem to the os if required - call with lock held
@@ -277,19 +283,17 @@ func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem)
if err != nil {
fs.Logf(nil, "Failed to free memory: %v", err)
} else {
bp.release(int64(bp.bufferSize))
}
bp.alloced--
}
// Put returns the buffer to the buffer cache or frees it
// _put returns the buffer to the buffer cache or frees it
//
// Note that if you try to return a buffer of the wrong size to Put it
// will panic.
func (bp *Pool) Put(buf []byte) {
bp.mu.Lock()
defer bp.mu.Unlock()
// call with lock held
//
// Note that if you try to return a buffer of the wrong size it will
// panic.
func (bp *Pool) _put(buf []byte) {
buf = buf[0:cap(buf)]
if len(buf) != bp.bufferSize {
panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(buf), bp.bufferSize))
@@ -299,11 +303,37 @@ func (bp *Pool) Put(buf []byte) {
} else {
bp.freeBuffer(buf)
}
bp.release(int64(bp.bufferSize))
}
// Put returns the buffer to the buffer cache or frees it
//
// Note that if you try to return a buffer of the wrong size to Put it
// will panic.
func (bp *Pool) Put(buf []byte) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp._put(buf)
bp.inUse--
bp.updateMinFill()
bp.kickFlusher()
}
// PutN returns the buffers to the buffer cache or frees it,
//
// Note that if you try to return a buffer of the wrong size to PutN it
// will panic.
func (bp *Pool) PutN(bufs [][]byte) {
bp.mu.Lock()
defer bp.mu.Unlock()
for _, buf := range bufs {
bp._put(buf)
}
bp.inUse -= len(bufs)
bp.updateMinFill()
bp.kickFlusher()
}
// bufferPool is a global pool of buffers
var bufferPool *Pool
var bufferPoolOnce sync.Once

View File

@@ -53,17 +53,27 @@ func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
bs := bp.GetN(3)
assert.Equal(t, 6, bp.InUse())
assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 6, bp.Alloced())
bp.Put(b1)
assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 5, bp.InUse())
assert.Equal(t, 1, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
assert.Equal(t, 6, bp.Alloced())
bp.Put(b2)
assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 4, bp.InUse())
assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
assert.Equal(t, 6, bp.Alloced())
bp.Put(b3)
assert.Equal(t, 3, bp.InUse())
assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 5, bp.Alloced())
bp.PutN(bs)
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
@@ -89,6 +99,18 @@ func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
bsa := bp.GetN(3)
assert.Equal(t, addr(b1), addr(bsa[1]))
assert.Equal(t, addr(b2), addr(bsa[0]))
assert.Equal(t, 3, bp.InUse())
assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
bp.PutN(bsa)
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
assert.Panics(t, func() {
bp.Put(make([]byte, 1))
})
@@ -265,11 +287,19 @@ func TestPoolMaxBufferMemory(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if i < 4 {
buf := bp.GetN(i + 1)
countBuf(i + 1)
time.Sleep(100 * time.Millisecond)
bp.PutN(buf)
countBuf(-(i + 1))
} else {
buf := bp.Get()
countBuf(1)
time.Sleep(100 * time.Millisecond)
bp.Put(buf)
countBuf(-1)
}
}()
}

View File

@@ -36,7 +36,7 @@ type RW struct {
out int // offset we are reading from
reads int // count how many times the data has been read
reserved int // number of buffers reserved
reserved [][]byte // reserved buffers
}
var (
@@ -63,15 +63,16 @@ func NewRW(pool *Pool) *RW {
// Reserve bytes of memory.
//
// Reserve, but don't allocate n bytes of memory.
// This allocates n bytes of memory for later use.
//
// This is rounded up to the nearest buffer page size.
//
// Only safe to call once.
func (rw *RW) Reserve(n int64) *RW {
rw.mu.Lock()
defer rw.mu.Unlock()
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
rw.pool.Reserve(buffers)
rw.reserved += buffers
rw.reserved = rw.pool.GetN(buffers)
return rw
}
@@ -216,9 +217,12 @@ func (rw *RW) writePage() (page []byte) {
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
}
if rw.reserved > 0 {
page = rw.pool.GetReserved()
rw.reserved--
if len(rw.reserved) > 0 {
// Get reserved pages if available
i := len(rw.reserved) - 1
page = rw.reserved[i]
rw.reserved[i] = nil
rw.reserved = rw.reserved[:i]
} else {
page = rw.pool.Get()
}
@@ -338,14 +342,12 @@ func (rw *RW) Close() error {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.signalWrite() // signal more data available
for _, page := range rw.pages {
rw.pool.Put(page)
}
rw.pool.PutN(rw.pages)
clear(rw.pages)
rw.pages = nil
if rw.reserved > 0 {
rw.pool.Release(rw.reserved)
rw.reserved = 0
}
rw.pool.PutN(rw.reserved)
clear(rw.reserved)
rw.reserved = nil
return nil
}