common/httpserver: listen on an abstract Unix socket

And make healthcheck command use it by default. This makes the
healthcheck command works whatever port the user has configured for the
HTTP service.
This commit is contained in:
Vincent Bernat
2025-11-24 08:30:41 +01:00
parent 9d8547116f
commit bd37c1d553
16 changed files with 246 additions and 67 deletions

8
.github/e2e.sh vendored
View File

@@ -102,14 +102,6 @@ EOF
nix run nixpkgs#hurl -- --test --error-format=long .github/e2e.hurl
}
echo ::endgroup::
# Docker Compose
echo ::group::Docker healthcheck test
! docker compose ps --format json \
| jq -se 'map(select(.State != "running" or .Health == "unhealthy"))
| map({Service, State, Status, Health})
| if length > 0 then . else false end'
echo ::endgroup::
;;
coverage)

View File

@@ -177,7 +177,7 @@ jobs:
timeout-minutes: 3
run: |
./.github/e2e.sh compose-setup
COMPOSE_PROFILES=demo,prometheus,loki docker compose up --detach --quiet-pull
COMPOSE_PROFILES=demo,prometheus,loki docker compose up --wait --quiet-pull
- name: Run tests
timeout-minutes: 3
run: |

View File

@@ -33,8 +33,8 @@ containers started with the label "akvorado.conntrack.fix=1".`,
return fmt.Errorf("unable to initialize daemon component: %w", err)
}
httpConfiguration := httpserver.DefaultConfiguration()
httpConfiguration.Listen = "127.0.0.1:0" // Run inside host network namespace, can't use 8080
httpComponent, err := httpserver.New(r, httpConfiguration, httpserver.Dependencies{
httpConfiguration.Listen = ""
httpComponent, err := httpserver.New(r, "conntrack-fixer", httpConfiguration, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -85,7 +85,7 @@ func consoleStart(r *reporter.Reporter, config ConsoleConfiguration, checkOnly b
if err != nil {
return fmt.Errorf("unable to initialize daemon component: %w", err)
}
httpComponent, err := httpserver.New(r, config.HTTP, httpserver.Dependencies{
httpComponent, err := httpserver.New(r, "console", config.HTTP, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -82,7 +82,7 @@ func demoExporterStart(r *reporter.Reporter, config DemoExporterConfiguration, c
if err != nil {
return fmt.Errorf("unable to initialize daemon component: %w", err)
}
httpComponent, err := httpserver.New(r, config.HTTP, httpserver.Dependencies{
httpComponent, err := httpserver.New(r, "demo-exporter", config.HTTP, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -4,15 +4,19 @@
package cmd
import (
"cmp"
"context"
"fmt"
"net"
"net/http"
"runtime"
"github.com/spf13/cobra"
)
type healthcheckOptions struct {
Host string
Port uint16
HTTP string
UnixService string
}
// HealthcheckOptions stores the command-line option values for the healthcheck
@@ -21,20 +25,39 @@ var HealthcheckOptions healthcheckOptions
func init() {
RootCmd.AddCommand(healthcheckCmd)
healthcheckCmd.Flags().Uint16VarP(&HealthcheckOptions.Port, "port", "p", 8080,
"HTTP port for health check")
healthcheckCmd.Flags().StringVarP(&HealthcheckOptions.Host, "host", "", "localhost",
"HTTP host for health check")
if runtime.GOOS == "linux" {
// On Linux, use Unix sockets
healthcheckCmd.Flags().StringVarP(&HealthcheckOptions.HTTP, "http", "", "",
"HTTP host:port for health check")
healthcheckCmd.Flags().StringVarP(&HealthcheckOptions.UnixService, "service", "", "",
"Service to query over Unix socket")
} else {
// On other OS, use HTTP
healthcheckCmd.Flags().StringVarP(&HealthcheckOptions.HTTP, "http", "", "localhost:8080",
"HTTP host:port for health check")
}
}
var healthcheckCmd = &cobra.Command{
Use: "healthcheck",
Short: "Check healthness",
Long: `Check if Akvorado is alive using the builtin HTTP endpoint.`,
Long: `Check if Akvorado is alive using the builtin HTTP endpoint.
The service can be checked over Unix socket (by default), or over HTTP`,
RunE: func(cmd *cobra.Command, _ []string) error {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/api/v0/healthcheck",
HealthcheckOptions.Host,
HealthcheckOptions.Port))
httpc := http.Client{}
if HealthcheckOptions.HTTP == "" {
unixSocket := "@akvorado"
if HealthcheckOptions.UnixService != "" {
unixSocket = fmt.Sprintf("%s/%s", unixSocket, HealthcheckOptions.UnixService)
}
httpc.Transport = &http.Transport{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return net.Dial("unix", unixSocket)
},
}
}
resp, err := httpc.Get(fmt.Sprintf("http://%s/api/v0/healthcheck",
cmp.Or(HealthcheckOptions.HTTP, "unix")))
if err != nil {
return err
}

76
cmd/healthcheck_test.go Normal file
View File

@@ -0,0 +1,76 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package cmd
import (
"bytes"
"fmt"
"runtime"
"strings"
"testing"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/httpserver"
"akvorado/common/reporter"
)
func TestHealthcheck(t *testing.T) {
// Setup a fake service
r := reporter.NewMock(t)
config := httpserver.DefaultConfiguration()
config.Listen = "127.0.0.1:0"
h, err := httpserver.New(r, "mock-healthcheck-test", config, httpserver.Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, h)
h.GinRouter.GET("/api/v0/healthcheck", r.HealthcheckHTTPHandler)
for _, tc := range []struct {
description string
args string
ok bool
}{
// We can't really know if it works with no args, because other tests may be running in parallel.
{
description: "HTTP test",
args: fmt.Sprintf("--http %s", h.LocalAddr().String()),
ok: true,
}, {
description: "failing HTTP test",
args: "--http 127.0.0.1:0",
ok: false,
}, {
description: "unix test",
args: "--service mock-healthcheck-test",
ok: true,
}, {
description: "failing unix test",
args: "--service not-mock-healthcheck-test",
ok: false,
},
} {
t.Run(tc.description, func(t *testing.T) {
if strings.HasPrefix(tc.args, "--service") && runtime.GOOS != "linux" {
t.Skip("unsupported OS")
}
args := []string{"healthcheck"}
args = append(args, strings.Split(tc.args, " ")...)
root := RootCmd
buf := new(bytes.Buffer)
root.SetOut(buf)
root.SetArgs(args)
HealthcheckOptions.HTTP = ""
HealthcheckOptions.UnixService = ""
t.Logf("args: %s", args)
err := root.Execute()
if err != nil && tc.ok {
t.Errorf("`healthcheck` error:\n%+v", err)
} else if err == nil && !tc.ok {
t.Error("`healthcheck` did not error")
}
})
}
}

View File

@@ -77,7 +77,7 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
if err != nil {
return fmt.Errorf("unable to initialize daemon component: %w", err)
}
httpComponent, err := httpserver.New(r, config.HTTP, httpserver.Dependencies{
httpComponent, err := httpserver.New(r, "inlet", config.HTTP, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -163,7 +163,7 @@ func init() {
}
func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, daemonComponent daemon.Component, checkOnly bool) error {
httpComponent, err := httpserver.New(r, config.HTTP, httpserver.Dependencies{
httpComponent, err := httpserver.New(r, "orchestrator", config.HTTP, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -103,7 +103,7 @@ func outletStart(r *reporter.Reporter, config OutletConfiguration, checkOnly boo
if err != nil {
return fmt.Errorf("unable to initialize daemon component: %w", err)
}
httpComponent, err := httpserver.New(r, config.HTTP, httpserver.Dependencies{
httpComponent, err := httpserver.New(r, "outlet", config.HTTP, httpserver.Dependencies{
Daemon: daemonComponent,
})
if err != nil {

View File

@@ -139,7 +139,7 @@ func TestRedis(t *testing.T) {
Server: server,
DB: 10,
}
h, err := httpserver.New(r, config, httpserver.Dependencies{Daemon: daemon.NewMock(t)})
h, err := httpserver.New(r, "cache-test", config, httpserver.Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -35,6 +35,7 @@ type Component struct {
mux *http.ServeMux
metrics metrics
address net.Addr
serviceName string
// GinRouter is the router exposed for /api
GinRouter *gin.Engine
@@ -47,13 +48,14 @@ type Dependencies struct {
}
// New creates a new HTTP component.
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
func New(r *reporter.Reporter, serviceName string, configuration Configuration, dependencies Dependencies) (*Component, error) {
c := Component{
r: r,
d: &dependencies,
config: configuration,
mux: http.NewServeMux(),
serviceName: serviceName,
GinRouter: gin.New(),
}
c.initMetrics()
@@ -104,11 +106,6 @@ func (c *Component) AddHandler(location string, handler http.Handler) {
// Start starts the HTTP component.
func (c *Component) Start() error {
if c.config.Listen == "" {
return nil
}
c.r.Info().Msg("starting HTTP component")
var err error
c.cacheStore, err = c.config.Cache.Config.New()
if err != nil {
@@ -116,24 +113,59 @@ func (c *Component) Start() error {
}
server := &http.Server{Handler: c.mux}
for _, lc := range []struct {
network string
address string
fatal bool
}{
// The regular one.
{
network: "tcp",
address: c.config.Listen,
fatal: true,
},
// Using abstract unix sockets for healthchecking
{
network: "unix",
address: "@akvorado",
fatal: false,
},
{
network: "unix",
address: fmt.Sprintf("@akvorado/%s", c.serviceName),
fatal: false,
},
} {
if lc.address == "" {
continue
}
if lc.network == "unix" && runtime.GOOS != "linux" {
continue
}
// Most of the time, if we have an error, it's here!
c.r.Info().Str("listen", c.config.Listen).Msg("starting HTTP server")
listener, err := net.Listen("tcp", c.config.Listen)
c.r.Info().Str("listen", lc.address).Msg("starting HTTP server")
listener, err := net.Listen(lc.network, lc.address)
if err != nil {
if lc.fatal {
return fmt.Errorf("unable to listen to %v: %w", c.config.Listen, err)
}
c.r.Info().Err(err).Msg("cannot start HTTP server")
continue
}
if lc.network == "tcp" {
c.address = listener.Addr()
}
server.Addr = listener.Addr().String()
// Start serving requests
// Serve requests
c.t.Go(func() error {
if err := server.Serve(listener); err != http.ErrServerClosed {
c.r.Err(err).Str("listen", c.config.Listen).Msg("unable to start HTTP server")
c.r.Err(err).Str("listen", lc.address).Msg("unable to start HTTP server")
return fmt.Errorf("unable to start HTTP server: %w", err)
}
return nil
})
// Gracefully stop when asked to
c.t.Go(func() error {
<-c.t.Dying()
@@ -145,14 +177,19 @@ func (c *Component) Start() error {
}
return nil
})
}
// In case we have no server
c.t.Go(func() error {
<-c.t.Dying()
return nil
})
return nil
}
// Stop stops the HTTP component
func (c *Component) Stop() error {
if c.config.Listen == "" {
return nil
}
c.r.Info().Msg("stopping HTTP component")
defer c.r.Info().Msg("HTTP component stopped")
c.t.Kill(nil)

View File

@@ -4,10 +4,15 @@
package httpserver_test
import (
"context"
"fmt"
"io"
"net"
"net/http"
"runtime"
"testing"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/httpserver"
"akvorado/common/reporter"
@@ -91,3 +96,49 @@ func TestGinRouterPanic(t *testing.T) {
},
})
}
func TestUnixSocket(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("unsupported OS")
}
r := reporter.NewMock(t)
config := httpserver.DefaultConfiguration()
config.Listen = ""
h, err := httpserver.New(r, "mock-unix-test", config, httpserver.Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, h)
h.AddHandler("/test",
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "Hello !")
}))
// We should listen to both @akvorado and @akvorado/mock-unix-test. However,
// we may have some parallel tests and @akvorado may not be the handler we
// configured. Let's just test the second one.
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return net.Dial("unix", "@akvorado/mock-unix-test")
},
},
}
response, err := httpc.Get("http://unix/test")
if err != nil {
t.Fatalf("Get() error:\n%+v", err)
}
defer response.Body.Close()
if response.StatusCode != 200 {
t.Errorf("Get() status = %d instead of %d", response.StatusCode, 200)
}
body, err := io.ReadAll(response.Body)
if err != nil {
t.Fatalf("ReadAll() error:\n%+v", err)
}
expected := "Hello !"
if diff := helpers.Diff(string(body), expected); diff != "" {
t.Fatalf("Get() body (-got, +want):\n%s", diff)
}
}

View File

@@ -18,7 +18,7 @@ func NewMock(t testing.TB, r *reporter.Reporter) *Component {
t.Helper()
config := DefaultConfiguration()
config.Listen = "0.0.0.0:0"
c, err := New(r, config, Dependencies{Daemon: daemon.NewMock(t)})
c, err := New(r, "mock", config, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -14,7 +14,7 @@ identified with a specific icon:
- 🩹 *docker*: restart geoip container on boot
- 🌱 *orchestrator*: add `kafka``manage-topic` flag to enable or disable topic management
- 🌱 *cmd*: add `--host` and `--port` flags to `akvorado healthcheck`
- 🌱 *cmd*: make `akvorado healthcheck` use an abstract Unix socket to check service liveness
## 2.0.3 - 2025-11-19

View File

@@ -213,7 +213,7 @@ services:
restart: unless-stopped
network_mode: host
healthcheck:
disable: true
test: ["CMD", "/usr/local/bin/akvorado", "healthcheck", "--service", "conntrack-fixer"]
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro