flow: add option to set UDP receive buffer size

This commit is contained in:
Vincent Bernat
2022-03-27 14:52:28 +02:00
parent 120d9d3bd3
commit ba99a50b43
4 changed files with 28 additions and 10 deletions

View File

@@ -63,11 +63,12 @@ core:
}
want(t, got["flow"], map[string]interface{}{
"inputs": []map[string]interface{}{{
"type": "udp",
"decoder": "netflow",
"listen": "0.0.0.0:2055",
"queuesize": 100000,
"workers": 5,
"type": "udp",
"decoder": "netflow",
"listen": "0.0.0.0:2055",
"queuesize": 100000,
"receivebuffer": 0,
"workers": 5,
}},
})
want(t, got["snmp"]["workers"], 2)
@@ -136,11 +137,12 @@ core:
want(t, got["flow"], map[string]interface{}{
"inputs": []map[string]interface{}{
{
"type": "udp",
"decoder": "netflow",
"listen": "0.0.0.0:2056",
"queuesize": 100000,
"workers": 5,
"type": "udp",
"decoder": "netflow",
"listen": "0.0.0.0:2056",
"queuesize": 100000,
"receivebuffer": 0,
"workers": 5,
}, {
"type": "file",
"decoder": "netflow",

View File

@@ -184,6 +184,7 @@ func TestMarshalYAML(t *testing.T) {
- decoder: netflow
listen: 192.0.2.11:2055
queuesize: 1000
receivebuffer: 0
type: udp
workers: 3
`

View File

@@ -10,6 +10,12 @@ type Configuration struct {
// communicate incoming flows. 0 can be used to disable
// buffering.
QueueSize uint
// ReceiveBuffer is the value of the requested buffer size for
// each listening socket. When 0, the value is left to the
// default value set by the kernel (net.core.wmem_default).
// The value cannot exceed the kernel max value
// (net.core.wmem_max).
ReceiveBuffer uint
}
// DefaultConfiguration is the default configuration for this input

View File

@@ -107,6 +107,7 @@ func (in *Input) Start() (<-chan []*decoder.FlowMessage, error) {
return nil, fmt.Errorf("unable to resolve %v: %w", in.config.Listen, err)
}
}
// Enable SO_REUSEPORT to bind several routines to the same port.
var listenConfig = net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var err error
@@ -127,6 +128,14 @@ func (in *Input) Start() (<-chan []*decoder.FlowMessage, error) {
}
udpConn := pconn.(*net.UDPConn)
in.address = udpConn.LocalAddr()
if in.config.ReceiveBuffer > 0 {
if err := udpConn.SetReadBuffer(int(in.config.ReceiveBuffer)); err != nil {
in.r.Warn().
Str("error", err.Error()).
Str("listen", in.config.Listen).
Msgf("unable to set requested buffer size (%d bytes)", in.config.ReceiveBuffer)
}
}
conns = append(conns, udpConn)
}