inlet/bmp: after flushing peers, allocate some time for readers

By downgrading the lock, we ensure readers get some time to work.
Otherwise, writers may take the lock back. The downgrade code was stolen
from https://github.com/google/gvisor/tree/master/pkg/sync.
This commit is contained in:
Vincent Bernat
2022-11-28 15:10:13 +01:00
parent 24cfabb682
commit a5d5b14112
6 changed files with 522 additions and 4 deletions

View File

@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2009 The Go Authors
// SPDX-License-Identifier: BSD-3-Clause
package sync
import (
_ "unsafe" // use of go:linkname
)
//go:linkname semacquire sync.runtime_Semacquire
func semacquire(addr *uint32)
//go:linkname semacquireMutex sync.runtime_SemacquireMutex
func semacquireMutex(s *uint32, lifo bool, skipframes int)
//go:linkname semrelease sync.runtime_Semrelease
func semrelease(addr *uint32, handoff bool, skipframes int)

View File

@@ -0,0 +1,187 @@
// SPDX-FileCopyrightText: 2009 The Go Authors
// SPDX-FileCopyrightText: 2009 The gVisor Authors
// SPDX-License-Identifier: BSD-3-Clause
// Package sync reimplements RWMutex from Go with downgrade for Lock.
package sync
import (
"sync"
"sync/atomic"
)
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m, just as for Mutex.
// For any call to RLock, there exists an n such that
// the n'th call to Unlock “synchronizes before” that call to RLock,
// and the corresponding call to RUnlock “synchronizes before”
// the n+1'th call to Lock.
type RWMutex struct {
w sync.Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30
// Happens-before relationships are indicated to the race detector via:
// - Unlock -> Lock: readerSem
// - Unlock -> RLock: readerSem
// - RUnlock -> Lock: writerSem
//
// The methods below temporarily disable handling of race synchronization
// events in order to provide the more precise model above to the race
// detector.
//
// For example, atomic.AddInt32 in RLock should not appear to provide
// acquire-release semantics, which would incorrectly synchronize racing
// readers, thus potentially missing races.
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
semacquireMutex(&rw.readerSem, false, 0)
}
}
// TryRLock tries to lock rw for reading and reports whether it succeeded.
//
// Note that while correct uses of TryRLock do exist, they are rare,
// and use of TryRLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryRLock() bool {
for {
c := atomic.LoadInt32(&rw.readerCount)
if c < 0 {
return false
}
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
return true
}
}
}
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
// DowngradeLock atomically unlocks rw for writing and locks it for reading.
func (rw *RWMutex) DowngradeLock() {
// Announce to readers there is no active writer and one additional reader.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders+1)
if r >= rwmutexMaxReaders+1 {
panic("sync: DowngradeLock of unlocked RWMutex")
}
// Unblock blocked readers, if any. Note that this loop starts as 1 since r
// includes this goroutine.
for i := 1; i < int(r); i++ {
semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed to rw.w.Lock(). Note that they will still
// block on rw.writerSem since at least this reader exists, such that
// DowngradeLock() is atomic with the previous write lock.
rw.w.Unlock()
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
panic("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
semrelease(&rw.writerSem, false, 1)
}
}
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
semacquireMutex(&rw.writerSem, false, 0)
}
}
// TryLock tries to lock rw for writing and reports whether it succeeded.
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
panic("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() sync.Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

View File

@@ -0,0 +1,311 @@
// SPDX-FileCopyrightText: 2009 The Go Authors
// SPDX-FileCopyrightText: 2009 The gVisor Authors
// SPDX-License-Identifier: BSD-3-Clause
// GOMAXPROCS=10 go test
package sync_test
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
. "akvorado/common/helpers/sync"
)
// There is a modified copy of this file in runtime/rwmutex_test.go.
// If you make any changes here, see if you should make them there.
func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) {
m.RLock()
clocked <- true
<-cunlock
m.RUnlock()
cdone <- true
}
func doTestParallelReaders(numReaders, gomaxprocs int) {
runtime.GOMAXPROCS(gomaxprocs)
var m RWMutex
clocked := make(chan bool)
cunlock := make(chan bool)
cdone := make(chan bool)
for i := 0; i < numReaders; i++ {
go parallelReader(&m, clocked, cunlock, cdone)
}
// Wait for all parallel RLock()s to succeed.
for i := 0; i < numReaders; i++ {
<-clocked
}
for i := 0; i < numReaders; i++ {
cunlock <- true
}
// Wait for the goroutines to finish.
for i := 0; i < numReaders; i++ {
<-cdone
}
}
func TestParallelReaders(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
doTestParallelReaders(1, 4)
doTestParallelReaders(3, 4)
doTestParallelReaders(4, 2)
}
func reader(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) {
for i := 0; i < num_iterations; i++ {
rwm.RLock()
n := atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
rwm.RUnlock()
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
rwm.RUnlock()
}
cdone <- true
}
func writer(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) {
for i := 0; i < num_iterations; i++ {
rwm.Lock()
n := atomic.AddInt32(activity, 10000)
if n != 10000 {
rwm.Unlock()
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
rwm.Unlock()
}
cdone <- true
}
func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
var rwm RWMutex
cdone := make(chan bool)
go writer(&rwm, num_iterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(&rwm, num_iterations, &activity, cdone)
}
go writer(&rwm, num_iterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(&rwm, num_iterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
}
func TestRWMutex(t *testing.T) {
var m RWMutex
m.Lock()
if m.TryLock() {
t.Fatalf("TryLock succeeded with mutex locked")
}
if m.TryRLock() {
t.Fatalf("TryRLock succeeded with mutex locked")
}
m.Unlock()
if !m.TryLock() {
t.Fatalf("TryLock failed with mutex unlocked")
}
m.Unlock()
if !m.TryRLock() {
t.Fatalf("TryRLock failed with mutex unlocked")
}
if !m.TryRLock() {
t.Fatalf("TryRLock failed with mutex rlocked")
}
if m.TryLock() {
t.Fatalf("TryLock succeeded with mutex rlocked")
}
m.RUnlock()
m.RUnlock()
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 1000
if testing.Short() {
n = 5
}
HammerRWMutex(1, 1, n)
HammerRWMutex(1, 3, n)
HammerRWMutex(1, 10, n)
HammerRWMutex(4, 1, n)
HammerRWMutex(4, 3, n)
HammerRWMutex(4, 10, n)
HammerRWMutex(10, 1, n)
HammerRWMutex(10, 3, n)
HammerRWMutex(10, 10, n)
HammerRWMutex(10, 5, n)
}
func TestRLocker(t *testing.T) {
var wl RWMutex
var rl sync.Locker
wlocked := make(chan bool, 1)
rlocked := make(chan bool, 1)
rl = wl.RLocker()
n := 10
go func() {
for i := 0; i < n; i++ {
rl.Lock()
rl.Lock()
rlocked <- true
wl.Lock()
wlocked <- true
}
}()
for i := 0; i < n; i++ {
<-rlocked
rl.Unlock()
select {
case <-wlocked:
t.Fatal("RLocker() didn't read-lock it")
default:
}
rl.Unlock()
<-wlocked
select {
case <-rlocked:
t.Fatal("RLocker() didn't respect the write lock")
default:
}
wl.Unlock()
}
}
func BenchmarkRWMutexUncontended(b *testing.B) {
type PaddedRWMutex struct {
RWMutex
pad [32]uint32
}
b.RunParallel(func(pb *testing.PB) {
var rwm PaddedRWMutex
for pb.Next() {
rwm.RLock()
rwm.RLock()
rwm.RUnlock()
rwm.RUnlock()
rwm.Lock()
rwm.Unlock()
}
})
}
func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
var rwm RWMutex
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
foo++
if foo%writeRatio == 0 {
rwm.Lock()
rwm.Unlock()
} else {
rwm.RLock()
for i := 0; i != localWork; i += 1 {
foo *= 2
foo /= 2
}
rwm.RUnlock()
}
}
_ = foo
})
}
func BenchmarkRWMutexWrite100(b *testing.B) {
benchmarkRWMutex(b, 0, 100)
}
func BenchmarkRWMutexWrite10(b *testing.B) {
benchmarkRWMutex(b, 0, 10)
}
func BenchmarkRWMutexWorkWrite100(b *testing.B) {
benchmarkRWMutex(b, 100, 100)
}
func BenchmarkRWMutexWorkWrite10(b *testing.B) {
benchmarkRWMutex(b, 100, 10)
}
func downgradingWriter(rwm *RWMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
rwm.Lock()
n := atomic.AddInt32(activity, 10000)
if n != 10000 {
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
rwm.DowngradeLock()
n = atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
rwm.RUnlock()
}
cdone <- true
}
func HammerDowngradableRWMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
var rwm RWMutex
cdone := make(chan bool)
go writer(&rwm, numIterations, &activity, cdone)
go downgradingWriter(&rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(&rwm, numIterations, &activity, cdone)
}
go writer(&rwm, numIterations, &activity, cdone)
go downgradingWriter(&rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(&rwm, numIterations, &activity, cdone)
}
// Wait for the 4 writers and all readers to finish.
for i := 0; i < 4+numReaders; i++ {
<-cdone
}
}
func TestDowngradableRWMutex(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 1000
if testing.Short() {
n = 5
}
HammerDowngradableRWMutex(1, 1, n)
HammerDowngradableRWMutex(1, 3, n)
HammerDowngradableRWMutex(1, 10, n)
HammerDowngradableRWMutex(4, 1, n)
HammerDowngradableRWMutex(4, 3, n)
HammerDowngradableRWMutex(4, 10, n)
HammerDowngradableRWMutex(10, 1, n)
HammerDowngradableRWMutex(10, 3, n)
HammerDowngradableRWMutex(10, 10, n)
HammerDowngradableRWMutex(10, 5, n)
}

View File

@@ -38,7 +38,7 @@ func DefaultConfiguration() Configuration {
CollectASPaths: true, CollectASPaths: true,
CollectCommunities: true, CollectCommunities: true,
Keep: 5 * time.Minute, Keep: 5 * time.Minute,
PeerRemovalMaxTime: 200 * time.Millisecond, PeerRemovalMaxTime: 100 * time.Millisecond,
PeerRemovalSleepInterval: 500 * time.Millisecond, PeerRemovalSleepInterval: 500 * time.Millisecond,
PeerRemovalMaxQueue: 10000, PeerRemovalMaxQueue: 10000,
PeerRemovalMinRoutes: 5000, PeerRemovalMinRoutes: 5000,

View File

@@ -24,7 +24,7 @@ func (c *Component) peerRemovalWorker() error {
start := c.d.Clock.Now() start := c.d.Clock.Now()
c.mu.Lock() c.mu.Lock()
defer func() { defer func() {
c.mu.Unlock() c.mu.DowngradeLock()
c.metrics.locked.WithLabelValues("peer-removal").Observe( c.metrics.locked.WithLabelValues("peer-removal").Observe(
float64(c.d.Clock.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000) float64(c.d.Clock.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000)
}() }()
@@ -45,15 +45,18 @@ func (c *Component) peerRemovalWorker() error {
// Run was complete, update metrics // Run was complete, update metrics
c.metrics.peers.WithLabelValues(exporterStr).Dec() c.metrics.peers.WithLabelValues(exporterStr).Dec()
c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc() c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc()
c.mu.RUnlock()
break break
} }
// Run is incompletem, update metrics and sleep a bit // Run is incomplete, update metrics and sleep a bit
c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc() c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc()
select { select {
case <-c.t.Dying(): case <-c.t.Dying():
c.mu.RUnlock()
return nil return nil
case <-time.After(c.config.PeerRemovalSleepInterval): case <-time.After(c.config.PeerRemovalSleepInterval):
} }
c.mu.RUnlock()
} }
} }
} }

View File

@@ -8,13 +8,13 @@ package bmp
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
"github.com/benbjohnson/clock" "github.com/benbjohnson/clock"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/helpers/sync"
"akvorado/common/reporter" "akvorado/common/reporter"
) )