flow: add a metric when the internal queue is full

This gives a good hint if there is a problem. Maybe we don't need to
optimize more.
This commit is contained in:
Vincent Bernat
2022-03-22 23:21:08 +01:00
parent cf1fe53511
commit bb1ce0fde5
7 changed files with 116 additions and 17 deletions

View File

@@ -75,7 +75,7 @@ configuration keys:
- `listen` to specify the IP and UDP port to listen for new flows
- `workers` to specify the number of workers to spawn to handle
incoming flows
- `buffer-size` to specify the number of flows to buffer when pushing
- `queue-size` to specify the number of flows to queue when pushing
them to the core component
For example:

View File

@@ -6,15 +6,15 @@ type Configuration struct {
Listen string
// Workers define the number of workers to use for decoding.
Workers int
// BufferSize defines the size of the channel used to
// QueueSize defines the size of the channel used to
// communicate incoming flows. 0 can be used to disable
// buffering.
BufferSize uint
QueueSize uint
}
// DefaultConfiguration represents the default configuration for the flow component
var DefaultConfiguration = Configuration{
Listen: "localhost:2055",
Workers: 1,
BufferSize: 1000,
QueueSize: 1000,
}

View File

@@ -109,11 +109,7 @@ func (c *Component) decodeFlow(payload []byte, source *net.UDPAddr) {
Inc()
for _, fmsg := range flowMessageSet {
select {
case <-c.t.Dying():
return
case c.incomingFlows <- convert(fmsg):
}
c.sendFlow(convert(fmsg))
}
}

View File

@@ -21,6 +21,8 @@ type metrics struct {
netflowSetStatsSum *reporter.CounterVec
netflowTimeStatsSum *reporter.SummaryVec
netflowTemplatesStats *reporter.CounterVec
outgoingQueueFullTotal reporter.Counter
}
func (c *Component) initMetrics() {
@@ -128,4 +130,10 @@ func (c *Component) initMetrics() {
},
[]string{"sampler", "version", "obs_domain_id", "template_id", "type"},
)
c.metrics.outgoingQueueFullTotal = c.r.Counter(
reporter.CounterOpts{
Name: "outgoing_queue_full_total",
Help: "Number of time the outgoing queue was full when sending a flow.",
},
)
}

View File

@@ -36,8 +36,8 @@ type Component struct {
// Metrics
metrics metrics
// Channel for receiving flows.
incomingFlows chan *FlowMessage
// Channel for sending flows.
outgoingFlows chan *FlowMessage
// Local address used by the Netflow server. Only valid after Start().
Address net.Addr
@@ -55,7 +55,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
r: r,
d: &dependencies,
config: configuration,
incomingFlows: make(chan *FlowMessage, configuration.BufferSize),
outgoingFlows: make(chan *FlowMessage, configuration.QueueSize),
}
c.d.Daemon.Track(&c.t, "flow")
c.initHTTP()
@@ -65,7 +65,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// Flows returns a channel to receive flows.
func (c *Component) Flows() <-chan *FlowMessage {
return c.incomingFlows
return c.outgoingFlows
}
// Start starts the flow component.
@@ -153,9 +153,26 @@ func (c *Component) spawnWorker(workerID int) error {
return nil
}
// sendFlow transmits received flows to the next component
func (c *Component) sendFlow(fmsg *FlowMessage) {
select {
case <-c.t.Dying():
return
case c.outgoingFlows <- fmsg:
default:
// Queue full
c.metrics.outgoingQueueFullTotal.Inc()
select {
case <-c.t.Dying():
return
case c.outgoingFlows <- fmsg:
}
}
}
// Stop stops the flow component
func (c *Component) Stop() error {
defer close(c.incomingFlows)
defer close(c.outgoingFlows)
c.r.Info().Msg("stopping flow component")
defer c.r.Info().Msg("flow component stopped")
c.t.Kill(nil)

View File

@@ -254,3 +254,81 @@ out4:
t.Fatalf("Metrics after data (-got, +want):\n%s", diff)
}
}
func TestOutgoingChanFull(t *testing.T) {
r := reporter.NewMock(t)
configuration := DefaultConfiguration
configuration.QueueSize = 1
c := NewMock(t, r, configuration)
defer func() {
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
}()
conn, err := net.Dial("udp", c.Address.String())
if err != nil {
t.Fatalf("Dial() failure:\n%+v", err)
}
// Send template
template, err := ioutil.ReadFile(filepath.Join("testdata", "template-260.data"))
if err != nil {
panic(err)
}
if _, err := conn.Write(template); err != nil {
t.Fatalf("Write() failure:\n%+v", err)
}
// Send data
data, err := ioutil.ReadFile(filepath.Join("testdata", "data-260.data"))
if err != nil {
panic(err)
}
if _, err := conn.Write(data); err != nil {
t.Fatalf("Write() failure:\n%+v", err)
}
checkQueueFullMetric := func(expected string) {
gotMetrics := r.GetMetrics(
"akvorado_flow_",
"outgoing_queue_full_total",
)
expectedMetrics := map[string]string{
`outgoing_queue_full_total`: expected,
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics after data (-got, +want):\n%s", diff)
}
}
// We should receive 4 flows. The queue size is 1. So, the second flow is blocked.
time.Sleep(10 * time.Millisecond)
checkQueueFullMetric("1")
// Accept the first flow and the third flow gets blocked too.
select {
case <-c.Flows():
case <-time.After(10 * time.Millisecond):
t.Fatal("First flow missing")
}
time.Sleep(10 * time.Millisecond)
checkQueueFullMetric("2")
// Accept the second flow and the fourth one gets blocked
select {
case <-c.Flows():
case <-time.After(10 * time.Millisecond):
t.Fatal("Second flow missing")
}
time.Sleep(10 * time.Millisecond)
checkQueueFullMetric("3")
// Accept the third flow and no more blocked flow
select {
case <-c.Flows():
case <-time.After(10 * time.Millisecond):
t.Fatal("Third flow missing")
}
time.Sleep(10 * time.Millisecond)
checkQueueFullMetric("3")
}

View File

@@ -30,5 +30,5 @@ func NewMock(t *testing.T, r *reporter.Reporter, config Configuration) *Componen
// Inject inject the provided flow message, as if it was received.
func (c *Component) Inject(t *testing.T, fmsg *FlowMessage) {
c.incomingFlows <- fmsg
c.outgoingFlows <- fmsg
}