From 5ce0c949f2ed51cb621014c4c9b40150e647332a Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Mon, 22 Sep 2025 18:29:17 +0200 Subject: [PATCH] inlet/flow: make some of the socket options not fatal If the kernel is too old for timestamping, it should not be fatal. I prefer to not accept SO_TIMESTAMP_OLD as the size of the timestamp is arch-dependent. Fix #1978 --- console/data/docs/99-changelog.md | 4 ++ inlet/flow/input/udp/root.go | 3 +- inlet/flow/input/udp/socket.go | 37 +++++++++++++------ inlet/flow/input/udp/socket_linux.go | 31 ++++++++++++---- inlet/flow/input/udp/socket_others.go | 16 +++++++- inlet/flow/input/udp/socket_test.go | 53 ++++++++++++++++++++++++++- 6 files changed, 123 insertions(+), 21 deletions(-) diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index a4605169..de92a6f3 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -10,6 +10,10 @@ identified with a specific icon: - 🩹: bug fix - 🌱: miscellaneous change +## Unreleased + +- 🩹 *inlet*: disable kernel timestamping on Linux kernel older than 5.1 + ## 2.0.0 - 2025-09-22 This release introduces a new component: the outlet. Previously, ClickHouse was diff --git a/inlet/flow/input/udp/root.go b/inlet/flow/input/udp/root.go index 9ac1e270..ed329f96 100644 --- a/inlet/flow/input/udp/root.go +++ b/inlet/flow/input/udp/root.go @@ -120,7 +120,8 @@ func (in *Input) Start() error { return fmt.Errorf("unable to resolve %v: %w", in.config.Listen, err) } } - pconn, err := listenConfig.ListenPacket(in.t.Context(context.Background()), "udp", listenAddr.String()) + pconn, err := listenConfig(in.r, udpSocketOptions). + ListenPacket(in.t.Context(context.Background()), "udp", listenAddr.String()) if err != nil { return fmt.Errorf("unable to listen to %v: %w", listenAddr, err) } diff --git a/inlet/flow/input/udp/socket.go b/inlet/flow/input/udp/socket.go index 88adfe01..82a3b126 100644 --- a/inlet/flow/input/udp/socket.go +++ b/inlet/flow/input/udp/socket.go @@ -4,10 +4,13 @@ package udp import ( + "fmt" "net" "syscall" "time" + "akvorado/common/reporter" + "golang.org/x/sys/unix" ) @@ -16,19 +19,31 @@ type oobMessage struct { Received time.Time } -// listenConfig configures a listening socket to reuse port and return overflows -var listenConfig = net.ListenConfig{ - Control: func(_, _ string, c syscall.RawConn) error { - var err error - c.Control(func(fd uintptr) { - opts := udpSocketOptions +// socketOption describes a socket option to be applied. +type socketOption struct { + Name string + Level int + Option int + Mandatory bool +} + +// listenConfig configures a listening socket with the udpSocketOptions. +var listenConfig = func(r *reporter.Reporter, opts []socketOption) *net.ListenConfig { + return &net.ListenConfig{ + Control: func(_, _ string, c syscall.RawConn) error { + var err error for _, opt := range opts { - err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, opt, 1) + c.Control(func(fd uintptr) { + err = unix.SetsockoptInt(int(fd), opt.Level, opt.Option, 1) + }) if err != nil { - return + if opt.Mandatory { + return fmt.Errorf("cannot set option %s: %w", opt.Name, err) + } + r.Warn().Err(err).Msgf("cannot set option %s", opt.Name) } } - }) - return err - }, + return nil + }, + } } diff --git a/inlet/flow/input/udp/socket_linux.go b/inlet/flow/input/udp/socket_linux.go index 33b694f0..63b98ba8 100644 --- a/inlet/flow/input/udp/socket_linux.go +++ b/inlet/flow/input/udp/socket_linux.go @@ -15,13 +15,30 @@ import ( var ( oobLength = syscall.CmsgLen(4) + syscall.CmsgLen(16) // uint32 + 2*int64 - udpSocketOptions = []int{ - // Allow multiple listeners to bind to the same IP/port - unix.SO_REUSEADDR, unix.SO_REUSEPORT, - // Get the number of dropped packets - unix.SO_RXQ_OVFL, - // Ask the kernel to timestamp incoming packets - unix.SO_TIMESTAMP_NEW | unix.SOF_TIMESTAMPING_RX_SOFTWARE, + udpSocketOptions = []socketOption{ + { + // Allow multiple listeners to bind to the same IP + Name: "SO_REUSEADDR", + Level: unix.SOL_SOCKET, + Option: unix.SO_REUSEADDR, + Mandatory: true, + }, { + // Allow multiple listeners to bind to the same port + Name: "SO_REUSEPORT", + Level: unix.SOL_SOCKET, + Option: unix.SO_REUSEPORT, + Mandatory: true, + }, { + // Get the number of dropped packets + Name: "SO_RXQ_OVFL", + Level: unix.SOL_SOCKET, + Option: unix.SO_RXQ_OVFL, + }, { + // Ask the kernel to timestamp incoming packets + Name: "SO_TIMESTAMP_NEW", + Level: unix.SOL_SOCKET, + Option: unix.SO_TIMESTAMP_NEW | unix.SOF_TIMESTAMPING_RX_SOFTWARE, + }, } ) diff --git a/inlet/flow/input/udp/socket_others.go b/inlet/flow/input/udp/socket_others.go index 03325350..5d316e03 100644 --- a/inlet/flow/input/udp/socket_others.go +++ b/inlet/flow/input/udp/socket_others.go @@ -9,7 +9,21 @@ import "golang.org/x/sys/unix" var ( oobLength = 0 - udpSocketOptions = []int{unix.SO_REUSEADDR, unix.SO_REUSEPORT} + udpSocketOptions = []socketOption{ + { + // Allow multiple listeners to bind to the same IP + Name: "SO_REUSEADDR", + Level: unix.SOL_SOCKET, + Option: unix.SO_REUSEADDR, + Mandatory: true, + }, { + // Allow multiple listeners to bind to the same port + Name: "SO_REUSEPORT", + Level: unix.SOL_SOCKET, + Option: unix.SO_REUSEPORT, + Mandatory: true, + }, + } ) // parseSocketControlMessage always returns 0. diff --git a/inlet/flow/input/udp/socket_test.go b/inlet/flow/input/udp/socket_test.go index 30cb8ac6..3edd965c 100644 --- a/inlet/flow/input/udp/socket_test.go +++ b/inlet/flow/input/udp/socket_test.go @@ -11,13 +11,19 @@ import ( "runtime" "testing" "time" + + "akvorado/common/reporter" + + "golang.org/x/sys/unix" ) func TestParseSocketControlMessage(t *testing.T) { if runtime.GOOS != "linux" { t.Skip("Skip Linux-only test") } - server, err := listenConfig.ListenPacket(context.Background(), "udp", "127.0.0.1:0") + r := reporter.NewMock(t) + server, err := listenConfig(r, udpSocketOptions). + ListenPacket(context.Background(), "udp", "127.0.0.1:0") if err != nil { t.Fatalf("ListenPacket() error:\n%+v", err) } @@ -78,3 +84,48 @@ outer: t.Fatal("too many drops detected") } } + +func TestListenConfig(t *testing.T) { + r := reporter.NewMock(t) + + t.Run("one mandatory option", func(t *testing.T) { + _, err := listenConfig(r, []socketOption{ + { + Name: "SO_REUSEADDR", + Level: unix.SOL_SOCKET, + Option: unix.SO_REUSEADDR, + Mandatory: true, + }, + }).ListenPacket(t.Context(), "udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ListenPacket() error:\n%+v", err) + } + }) + + t.Run("one mandatory not implemented option", func(t *testing.T) { + _, err := listenConfig(r, []socketOption{ + { + Name: "SO_UNKNOWN", + Level: unix.SOL_SOCKET, + Option: 9999, + Mandatory: true, + }, + }).ListenPacket(t.Context(), "udp", "127.0.0.1:0") + if err == nil { + t.Fatal("ListenPacket() did not error error") + } + }) + + t.Run("one optional not implemented option", func(t *testing.T) { + _, err := listenConfig(r, []socketOption{ + { + Name: "SO_UNKNOWN", + Level: unix.SOL_SOCKET, + Option: 9999, + }, + }).ListenPacket(t.Context(), "udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ListenPacket() error:\n%+v", err) + } + }) +}