mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
inlet/routing: do not rely on custom RWMutex
It was done to add the ability to downgrade the lock. However, the number of times the RW lock is taken has been greatly reduced and it does not make sense to maintain another implementation just for that.
This commit is contained in:
@@ -1,14 +0,0 @@
|
||||
// SPDX-FileCopyrightText: 2009 The Go Authors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
_ "unsafe" // use of go:linkname
|
||||
)
|
||||
|
||||
//go:linkname semacquireMutex sync.runtime_SemacquireMutex
|
||||
func semacquireMutex(s *uint32, lifo bool, skipframes int)
|
||||
|
||||
//go:linkname semrelease sync.runtime_Semrelease
|
||||
func semrelease(addr *uint32, handoff bool, skipframes int)
|
||||
@@ -1,259 +0,0 @@
|
||||
// SPDX-FileCopyrightText: 2009 The Go Authors
|
||||
// SPDX-FileCopyrightText: 2009 The gVisor Authors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// Package sync reimplements RWMutex from Go with downgrade for Lock.
|
||||
package sync
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"akvorado/common/helpers/race"
|
||||
)
|
||||
|
||||
// There is a modified copy of this file in runtime/rwmutex.go.
|
||||
// If you make any changes here, see if you should make them there.
|
||||
|
||||
// A RWMutex is a reader/writer mutual exclusion lock.
|
||||
// The lock can be held by an arbitrary number of readers or a single writer.
|
||||
// The zero value for a RWMutex is an unlocked mutex.
|
||||
//
|
||||
// A RWMutex must not be copied after first use.
|
||||
//
|
||||
// If a goroutine holds a RWMutex for reading and another goroutine might
|
||||
// call Lock, no goroutine should expect to be able to acquire a read lock
|
||||
// until the initial read lock is released. In particular, this prohibits
|
||||
// recursive read locking. This is to ensure that the lock eventually becomes
|
||||
// available; a blocked Lock call excludes new readers from acquiring the
|
||||
// lock.
|
||||
//
|
||||
// In the terminology of the Go memory model,
|
||||
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
|
||||
// for any n < m, just as for Mutex.
|
||||
// For any call to RLock, there exists an n such that
|
||||
// the n'th call to Unlock “synchronizes before” that call to RLock,
|
||||
// and the corresponding call to RUnlock “synchronizes before”
|
||||
// the n+1'th call to Lock.
|
||||
type RWMutex struct {
|
||||
w sync.Mutex // held if there are pending writers
|
||||
writerSem uint32 // semaphore for writers to wait for completing readers
|
||||
readerSem uint32 // semaphore for readers to wait for completing writers
|
||||
readerCount int32 // number of pending readers
|
||||
readerWait int32 // number of departing readers
|
||||
}
|
||||
|
||||
const rwmutexMaxReaders = 1 << 30
|
||||
|
||||
// Happens-before relationships are indicated to the race detector via:
|
||||
// - Unlock -> Lock: readerSem
|
||||
// - Unlock -> RLock: readerSem
|
||||
// - RUnlock -> Lock: writerSem
|
||||
//
|
||||
// The methods below temporarily disable handling of race synchronization
|
||||
// events in order to provide the more precise model above to the race
|
||||
// detector.
|
||||
//
|
||||
// For example, atomic.AddInt32 in RLock should not appear to provide
|
||||
// acquire-release semantics, which would incorrectly synchronize racing
|
||||
// readers, thus potentially missing races.
|
||||
|
||||
// RLock locks rw for reading.
|
||||
//
|
||||
// It should not be used for recursive read locking; a blocked Lock
|
||||
// call excludes new readers from acquiring the lock. See the
|
||||
// documentation on the RWMutex type.
|
||||
func (rw *RWMutex) RLock() {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.Disable()
|
||||
}
|
||||
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
|
||||
// A writer is pending, wait for it.
|
||||
semacquireMutex(&rw.readerSem, false, 0)
|
||||
}
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
race.Acquire(unsafe.Pointer(&rw.readerSem))
|
||||
}
|
||||
}
|
||||
|
||||
// TryRLock tries to lock rw for reading and reports whether it succeeded.
|
||||
//
|
||||
// Note that while correct uses of TryRLock do exist, they are rare,
|
||||
// and use of TryRLock is often a sign of a deeper problem
|
||||
// in a particular use of mutexes.
|
||||
func (rw *RWMutex) TryRLock() bool {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.Disable()
|
||||
}
|
||||
for {
|
||||
c := atomic.LoadInt32(&rw.readerCount)
|
||||
if c < 0 {
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
return false
|
||||
}
|
||||
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
race.Acquire(unsafe.Pointer(&rw.readerSem))
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RUnlock undoes a single RLock call;
|
||||
// it does not affect other simultaneous readers.
|
||||
// It is a run-time error if rw is not locked for reading
|
||||
// on entry to RUnlock.
|
||||
func (rw *RWMutex) RUnlock() {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
|
||||
race.Disable()
|
||||
}
|
||||
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
|
||||
// Outlined slow-path to allow the fast-path to be inlined
|
||||
rw.rUnlockSlow(r)
|
||||
}
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
}
|
||||
|
||||
func (rw *RWMutex) rUnlockSlow(r int32) {
|
||||
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
|
||||
race.Enable()
|
||||
panic("sync: RUnlock of unlocked RWMutex")
|
||||
}
|
||||
// A writer is pending.
|
||||
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
|
||||
// The last reader unblocks the writer.
|
||||
semrelease(&rw.writerSem, false, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// Lock locks rw for writing.
|
||||
// If the lock is already locked for reading or writing,
|
||||
// Lock blocks until the lock is available.
|
||||
func (rw *RWMutex) Lock() {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.Disable()
|
||||
}
|
||||
// First, resolve competition with other writers.
|
||||
rw.w.Lock()
|
||||
// Announce to readers there is a pending writer.
|
||||
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
|
||||
// Wait for active readers.
|
||||
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
|
||||
semacquireMutex(&rw.writerSem, false, 0)
|
||||
}
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
race.Acquire(unsafe.Pointer(&rw.readerSem))
|
||||
race.Acquire(unsafe.Pointer(&rw.writerSem))
|
||||
}
|
||||
}
|
||||
|
||||
// TryLock tries to lock rw for writing and reports whether it succeeded.
|
||||
//
|
||||
// Note that while correct uses of TryLock do exist, they are rare,
|
||||
// and use of TryLock is often a sign of a deeper problem
|
||||
// in a particular use of mutexes.
|
||||
func (rw *RWMutex) TryLock() bool {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.Disable()
|
||||
}
|
||||
if !rw.w.TryLock() {
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
return false
|
||||
}
|
||||
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
|
||||
rw.w.Unlock()
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
return false
|
||||
}
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
race.Acquire(unsafe.Pointer(&rw.readerSem))
|
||||
race.Acquire(unsafe.Pointer(&rw.writerSem))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Unlock unlocks rw for writing. It is a run-time error if rw is
|
||||
// not locked for writing on entry to Unlock.
|
||||
//
|
||||
// As with Mutexes, a locked RWMutex is not associated with a particular
|
||||
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
|
||||
// arrange for another goroutine to RUnlock (Unlock) it.
|
||||
func (rw *RWMutex) Unlock() {
|
||||
if race.Enabled {
|
||||
// _ = rw.w.state
|
||||
race.Release(unsafe.Pointer(&rw.readerSem))
|
||||
race.Disable()
|
||||
}
|
||||
|
||||
// Announce to readers there is no active writer.
|
||||
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
|
||||
if r >= rwmutexMaxReaders {
|
||||
race.Enable()
|
||||
panic("sync: Unlock of unlocked RWMutex")
|
||||
}
|
||||
// Unblock blocked readers, if any.
|
||||
for i := 0; i < int(r); i++ {
|
||||
semrelease(&rw.readerSem, false, 0)
|
||||
}
|
||||
// Allow other writers to proceed.
|
||||
rw.w.Unlock()
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
}
|
||||
|
||||
// DowngradeLock atomically unlocks rw for writing and locks it for reading.
|
||||
func (rw *RWMutex) DowngradeLock() {
|
||||
if race.Enabled {
|
||||
race.Release(unsafe.Pointer(&rw.readerSem))
|
||||
race.Disable()
|
||||
}
|
||||
// Announce to readers there is no active writer and one additional reader.
|
||||
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders+1)
|
||||
if r >= rwmutexMaxReaders+1 {
|
||||
panic("DowngradeLock of unlocked RWMutex")
|
||||
}
|
||||
// Unblock blocked readers, if any. Note that this loop starts as 1 since r
|
||||
// includes this goroutine.
|
||||
for i := 1; i < int(r); i++ {
|
||||
semrelease(&rw.readerSem, false, 0)
|
||||
}
|
||||
// Allow other writers to proceed to rw.w.Lock(). Note that they will still
|
||||
// block on rw.writerSem since at least this reader exists, such that
|
||||
// DowngradeLock() is atomic with the previous write lock.
|
||||
rw.w.Unlock()
|
||||
if race.Enabled {
|
||||
race.Enable()
|
||||
}
|
||||
}
|
||||
|
||||
// RLocker returns a Locker interface that implements
|
||||
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
|
||||
func (rw *RWMutex) RLocker() sync.Locker {
|
||||
return (*rlocker)(rw)
|
||||
}
|
||||
|
||||
type rlocker RWMutex
|
||||
|
||||
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
|
||||
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
|
||||
@@ -1,311 +0,0 @@
|
||||
// SPDX-FileCopyrightText: 2009 The Go Authors
|
||||
// SPDX-FileCopyrightText: 2009 The gVisor Authors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// GOMAXPROCS=10 go test
|
||||
|
||||
package sync_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
. "akvorado/common/helpers/sync"
|
||||
)
|
||||
|
||||
// There is a modified copy of this file in runtime/rwmutex_test.go.
|
||||
// If you make any changes here, see if you should make them there.
|
||||
|
||||
func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) {
|
||||
m.RLock()
|
||||
clocked <- true
|
||||
<-cunlock
|
||||
m.RUnlock()
|
||||
cdone <- true
|
||||
}
|
||||
|
||||
func doTestParallelReaders(numReaders, gomaxprocs int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
var m RWMutex
|
||||
clocked := make(chan bool)
|
||||
cunlock := make(chan bool)
|
||||
cdone := make(chan bool)
|
||||
for i := 0; i < numReaders; i++ {
|
||||
go parallelReader(&m, clocked, cunlock, cdone)
|
||||
}
|
||||
// Wait for all parallel RLock()s to succeed.
|
||||
for i := 0; i < numReaders; i++ {
|
||||
<-clocked
|
||||
}
|
||||
for i := 0; i < numReaders; i++ {
|
||||
cunlock <- true
|
||||
}
|
||||
// Wait for the goroutines to finish.
|
||||
for i := 0; i < numReaders; i++ {
|
||||
<-cdone
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelReaders(t *testing.T) {
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
|
||||
doTestParallelReaders(1, 4)
|
||||
doTestParallelReaders(3, 4)
|
||||
doTestParallelReaders(4, 2)
|
||||
}
|
||||
|
||||
func reader(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < num_iterations; i++ {
|
||||
rwm.RLock()
|
||||
n := atomic.AddInt32(activity, 1)
|
||||
if n < 1 || n >= 10000 {
|
||||
rwm.RUnlock()
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
}
|
||||
atomic.AddInt32(activity, -1)
|
||||
rwm.RUnlock()
|
||||
}
|
||||
cdone <- true
|
||||
}
|
||||
|
||||
func writer(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < num_iterations; i++ {
|
||||
rwm.Lock()
|
||||
n := atomic.AddInt32(activity, 10000)
|
||||
if n != 10000 {
|
||||
rwm.Unlock()
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
}
|
||||
atomic.AddInt32(activity, -10000)
|
||||
rwm.Unlock()
|
||||
}
|
||||
cdone <- true
|
||||
}
|
||||
|
||||
func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
// Number of active readers + 10000 * number of active writers.
|
||||
var activity int32
|
||||
var rwm RWMutex
|
||||
cdone := make(chan bool)
|
||||
go writer(&rwm, num_iterations, &activity, cdone)
|
||||
var i int
|
||||
for i = 0; i < numReaders/2; i++ {
|
||||
go reader(&rwm, num_iterations, &activity, cdone)
|
||||
}
|
||||
go writer(&rwm, num_iterations, &activity, cdone)
|
||||
for ; i < numReaders; i++ {
|
||||
go reader(&rwm, num_iterations, &activity, cdone)
|
||||
}
|
||||
// Wait for the 2 writers and all readers to finish.
|
||||
for i := 0; i < 2+numReaders; i++ {
|
||||
<-cdone
|
||||
}
|
||||
}
|
||||
|
||||
func TestRWMutex(t *testing.T) {
|
||||
var m RWMutex
|
||||
|
||||
m.Lock()
|
||||
if m.TryLock() {
|
||||
t.Fatalf("TryLock succeeded with mutex locked")
|
||||
}
|
||||
if m.TryRLock() {
|
||||
t.Fatalf("TryRLock succeeded with mutex locked")
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
if !m.TryLock() {
|
||||
t.Fatalf("TryLock failed with mutex unlocked")
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
if !m.TryRLock() {
|
||||
t.Fatalf("TryRLock failed with mutex unlocked")
|
||||
}
|
||||
if !m.TryRLock() {
|
||||
t.Fatalf("TryRLock failed with mutex rlocked")
|
||||
}
|
||||
if m.TryLock() {
|
||||
t.Fatalf("TryLock succeeded with mutex rlocked")
|
||||
}
|
||||
m.RUnlock()
|
||||
m.RUnlock()
|
||||
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
|
||||
n := 1000
|
||||
if testing.Short() {
|
||||
n = 5
|
||||
}
|
||||
HammerRWMutex(1, 1, n)
|
||||
HammerRWMutex(1, 3, n)
|
||||
HammerRWMutex(1, 10, n)
|
||||
HammerRWMutex(4, 1, n)
|
||||
HammerRWMutex(4, 3, n)
|
||||
HammerRWMutex(4, 10, n)
|
||||
HammerRWMutex(10, 1, n)
|
||||
HammerRWMutex(10, 3, n)
|
||||
HammerRWMutex(10, 10, n)
|
||||
HammerRWMutex(10, 5, n)
|
||||
}
|
||||
|
||||
func TestRLocker(t *testing.T) {
|
||||
var wl RWMutex
|
||||
var rl sync.Locker
|
||||
wlocked := make(chan bool, 1)
|
||||
rlocked := make(chan bool, 1)
|
||||
rl = wl.RLocker()
|
||||
n := 10
|
||||
go func() {
|
||||
for i := 0; i < n; i++ {
|
||||
rl.Lock()
|
||||
rl.Lock()
|
||||
rlocked <- true
|
||||
wl.Lock()
|
||||
wlocked <- true
|
||||
}
|
||||
}()
|
||||
for i := 0; i < n; i++ {
|
||||
<-rlocked
|
||||
rl.Unlock()
|
||||
select {
|
||||
case <-wlocked:
|
||||
t.Fatal("RLocker() didn't read-lock it")
|
||||
default:
|
||||
}
|
||||
rl.Unlock()
|
||||
<-wlocked
|
||||
select {
|
||||
case <-rlocked:
|
||||
t.Fatal("RLocker() didn't respect the write lock")
|
||||
default:
|
||||
}
|
||||
wl.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRWMutexUncontended(b *testing.B) {
|
||||
type PaddedRWMutex struct {
|
||||
RWMutex
|
||||
pad [32]uint32
|
||||
}
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var rwm PaddedRWMutex
|
||||
for pb.Next() {
|
||||
rwm.RLock()
|
||||
rwm.RLock()
|
||||
rwm.RUnlock()
|
||||
rwm.RUnlock()
|
||||
rwm.Lock()
|
||||
rwm.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
|
||||
var rwm RWMutex
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
foo := 0
|
||||
for pb.Next() {
|
||||
foo++
|
||||
if foo%writeRatio == 0 {
|
||||
rwm.Lock()
|
||||
rwm.Unlock()
|
||||
} else {
|
||||
rwm.RLock()
|
||||
for i := 0; i != localWork; i += 1 {
|
||||
foo *= 2
|
||||
foo /= 2
|
||||
}
|
||||
rwm.RUnlock()
|
||||
}
|
||||
}
|
||||
_ = foo
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkRWMutexWrite100(b *testing.B) {
|
||||
benchmarkRWMutex(b, 0, 100)
|
||||
}
|
||||
|
||||
func BenchmarkRWMutexWrite10(b *testing.B) {
|
||||
benchmarkRWMutex(b, 0, 10)
|
||||
}
|
||||
|
||||
func BenchmarkRWMutexWorkWrite100(b *testing.B) {
|
||||
benchmarkRWMutex(b, 100, 100)
|
||||
}
|
||||
|
||||
func BenchmarkRWMutexWorkWrite10(b *testing.B) {
|
||||
benchmarkRWMutex(b, 100, 10)
|
||||
}
|
||||
|
||||
func downgradingWriter(rwm *RWMutex, numIterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < numIterations; i++ {
|
||||
rwm.Lock()
|
||||
n := atomic.AddInt32(activity, 10000)
|
||||
if n != 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
}
|
||||
atomic.AddInt32(activity, -10000)
|
||||
rwm.DowngradeLock()
|
||||
n = atomic.AddInt32(activity, 1)
|
||||
if n < 1 || n >= 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
}
|
||||
atomic.AddInt32(activity, -1)
|
||||
rwm.RUnlock()
|
||||
}
|
||||
cdone <- true
|
||||
}
|
||||
|
||||
func HammerDowngradableRWMutex(gomaxprocs, numReaders, numIterations int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
// Number of active readers + 10000 * number of active writers.
|
||||
var activity int32
|
||||
var rwm RWMutex
|
||||
cdone := make(chan bool)
|
||||
go writer(&rwm, numIterations, &activity, cdone)
|
||||
go downgradingWriter(&rwm, numIterations, &activity, cdone)
|
||||
var i int
|
||||
for i = 0; i < numReaders/2; i++ {
|
||||
go reader(&rwm, numIterations, &activity, cdone)
|
||||
}
|
||||
go writer(&rwm, numIterations, &activity, cdone)
|
||||
go downgradingWriter(&rwm, numIterations, &activity, cdone)
|
||||
for ; i < numReaders; i++ {
|
||||
go reader(&rwm, numIterations, &activity, cdone)
|
||||
}
|
||||
// Wait for the 4 writers and all readers to finish.
|
||||
for i := 0; i < 4+numReaders; i++ {
|
||||
<-cdone
|
||||
}
|
||||
}
|
||||
|
||||
func TestDowngradableRWMutex(t *testing.T) {
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
|
||||
n := 1000
|
||||
if testing.Short() {
|
||||
n = 5
|
||||
}
|
||||
HammerDowngradableRWMutex(1, 1, n)
|
||||
HammerDowngradableRWMutex(1, 3, n)
|
||||
HammerDowngradableRWMutex(1, 10, n)
|
||||
HammerDowngradableRWMutex(4, 1, n)
|
||||
HammerDowngradableRWMutex(4, 3, n)
|
||||
HammerDowngradableRWMutex(4, 10, n)
|
||||
HammerDowngradableRWMutex(10, 1, n)
|
||||
HammerDowngradableRWMutex(10, 3, n)
|
||||
HammerDowngradableRWMutex(10, 10, n)
|
||||
HammerDowngradableRWMutex(10, 5, n)
|
||||
}
|
||||
@@ -24,9 +24,10 @@ func (p *Provider) peerRemovalWorker() error {
|
||||
p.mu.Lock()
|
||||
defer func() {
|
||||
cancel()
|
||||
p.mu.DowngradeLock()
|
||||
p.mu.Unlock()
|
||||
p.metrics.locked.WithLabelValues("peer-removal").Observe(
|
||||
float64(p.d.Clock.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000)
|
||||
p.mu.RLock()
|
||||
}()
|
||||
pinfo := p.peers[pkey]
|
||||
if pinfo == nil {
|
||||
|
||||
@@ -8,13 +8,13 @@ package bmp
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"gopkg.in/tomb.v2"
|
||||
|
||||
"akvorado/common/helpers/sync"
|
||||
"akvorado/common/reporter"
|
||||
"akvorado/inlet/routing/provider"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user