docker: replace Alloy by Vector for parsing logs

Alloy does not allow to turn the parsed metadata into actual metadata,
without enumerating each of them. Also, Vector is far more versatile.
And you can put unittests!

Also, parse more logs. Everything should be there, except ClickHouse.

Fix #1907
This commit is contained in:
Vincent Bernat
2025-08-30 00:02:59 +02:00
parent d9b2ee888b
commit 95e2011d0c
12 changed files with 713 additions and 239 deletions

View File

@@ -56,6 +56,8 @@ jobs:
run: make test-race run: make test-race
- name: JS tests - name: JS tests
run: make test-js run: make test-js
- name: Vector tests
run: docker compose -f docker/docker-compose-dev.yml run --quiet --rm vector test
- name: Run coverage tests - name: Run coverage tests
run: make test-coverage run: make test-coverage
- name: Upload coverage results - name: Upload coverage results

View File

@@ -1,8 +0,0 @@
discovery.docker "docker" {
host = "unix:///var/run/docker.sock"
refresh_interval = "30s"
filter {
name = "label"
values = ["com.docker.compose.project=akvorado"]
}
}

View File

@@ -1,199 +1,3 @@
loki.write "default" {
endpoint {
url = "http://loki:3100/loki/loki/api/v1/push"
}
}
discovery.relabel "loki" {
targets = discovery.docker.docker.targets
rule {
source_labels = ["__meta_docker_container_label_com_docker_compose_service"]
regex = `(.+)(?:-\d+)?`
target_label = "service_name"
}
rule {
source_labels = ["__address__"]
regex = `(.+):\d+`
target_label = "instance"
}
}
discovery.relabel "akvorado" {
targets = discovery.relabel.loki.output
rule {
source_labels = ["service_name"]
regex = "akvorado-.*"
action = "keep"
}
}
loki.source.docker "akvorado" {
host = "unix:///var/run/docker.sock"
targets = discovery.relabel.akvorado.output
forward_to = [loki.process.akvorado.receiver]
}
loki.process "akvorado" {
forward_to = [loki.write.default.receiver]
stage.json {
expressions = {
level = "level",
time = "time",
caller = "caller",
error = "error",
module = "module",
message = "message",
}
}
stage.labels {
values = {
level = "",
module = "",
}
}
stage.structured_metadata {
values = {
caller = "",
error = "",
}
}
stage.timestamp {
source = "time"
format = "RFC3339"
}
stage.output {
source = "message"
}
}
discovery.relabel "kafka" {
targets = discovery.relabel.loki.output
rule {
source_labels = ["service_name"]
regex = "kafka"
action = "keep"
}
}
loki.source.docker "kafka" {
host = "unix:///var/run/docker.sock"
targets = discovery.relabel.kafka.output
forward_to = [loki.process.kafka.receiver]
}
loki.process "kafka" {
forward_to = [loki.write.default.receiver]
stage.multiline {
firstline = `^\[\d{4}-\d{2}-\d{2} `
max_wait_time = "3s"
max_lines = 1000
}
stage.regex {
expression = `^\[(?P<timestamp>[^\]]+)\]\s+(?P<level>\w+)\s+(?P<message>(?s:.*))$`
}
stage.timestamp {
source = "timestamp"
format = "2006-01-02 15:04:05,000"
}
stage.labels {
values = {
level = "",
}
}
stage.output {
source = "message"
}
}
discovery.relabel "redis" {
targets = discovery.relabel.loki.output
rule {
source_labels = ["service_name"]
regex = "redis"
action = "keep"
}
}
loki.source.docker "redis" {
host = "unix:///var/run/docker.sock"
targets = discovery.relabel.redis.output
forward_to = [loki.process.redis.receiver]
}
loki.process "redis" {
forward_to = [loki.write.default.receiver]
// 1:C 28 Aug 2025 04:08:22.843 # Warning: no config file specified
stage.regex {
expression = `^(?P<pid>\d+):(?P<role>[XCSM])\s+(?P<timestamp>\d+\s+\w+\s+\d{4}\s+\d{2}:\d{2}:\d{2}\.\d{3})\s+(?P<level>[*#.-])\s+(?P<message>.*)$`
}
stage.template {
source = "role_name"
template = `{{ if eq .role "X" }}sentinel{{ else if eq .role "C" }}RDB{{ else if eq .role "S" }}slave{{ else if eq .role "M" }}master{{ end }}`
}
stage.template {
source = "level_name"
template = `{{ if eq .level "." }}debug{{ else if eq .level "-" }}info{{ else if eq .level "*" }}notice{{ else if eq .level "#" }}warning{{ end }}`
}
stage.labels {
values = {
level = "level_name",
role = "role_name",
}
}
stage.timestamp {
source = "timestamp"
format = "2 Jan 2006 15:04:05.000"
}
stage.output {
source = "message"
}
}
discovery.relabel "alloy" {
targets = discovery.relabel.loki.output
rule {
source_labels = ["service_name"]
regex = "alloy"
action = "keep"
}
}
loki.source.docker "alloy" {
host = "unix:///var/run/docker.sock"
targets = discovery.relabel.alloy.output
forward_to = [loki.process.alloy.receiver]
}
loki.process "alloy" {
forward_to = [loki.write.default.receiver]
// ts=2025-08-28T09:30:45.497277819Z level=info msg="Scraped metadata watcher stopped" component_path=/ component_id=prometheus.remote_write.default subcomponent=rw remote_name=0ffafb url=http://prometheus:9090/prometheus/api/v1/write
stage.logfmt {
mapping = {
ts = "",
level = "",
msg = "",
err = "",
node = "",
component_path = "",
component_id = "",
}
}
stage.labels {
values = {
level = "",
}
}
stage.structured_metadata {
values = {
node = "",
component_path = "",
component_id = "",
}
}
stage.template {
source = "message"
template = `{{ .msg }}{{ if .err }}: {{ .err }}{{ end }}`
}
stage.timestamp {
source = "ts"
format = "RFC3339"
}
stage.output {
source = "message"
}
}
discovery.relabel "lokilogs" { discovery.relabel "lokilogs" {
targets = discovery.relabel.loki.output targets = discovery.relabel.loki.output

View File

@@ -1,3 +1,12 @@
discovery.docker "docker" {
host = "unix:///var/run/docker.sock"
refresh_interval = "30s"
filter {
name = "label"
values = ["com.docker.compose.project=akvorado"]
}
}
prometheus.remote_write "default" { prometheus.remote_write "default" {
endpoint { endpoint {
url = "http://prometheus:9090/prometheus/api/v1/write" url = "http://prometheus:9090/prometheus/api/v1/write"

View File

@@ -1,26 +0,0 @@
---
services:
alloy:
extends:
file: versions.yml
service: alloy
restart: unless-stopped
user: root # for access to /var/run/docker.sock
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./alloy/config.alloy:/etc/alloy/config.alloy
extra_hosts:
- "host.docker.internal:host-gateway"
expose:
- 12345
command:
- run
- /etc/alloy
- --storage.path=/var/lib/alloy/data
- --server.http.listen-addr=0.0.0.0:12345
- --server.http.ui-path-prefix=/alloy
labels:
- traefik.enable=true
- traefik.http.routers.alloy.rule=PathPrefix(`/alloy`)
- traefik.http.routers.alloy.entrypoints=private
- metrics.port=12345

View File

@@ -182,3 +182,16 @@ services:
ports: ports:
- 127.0.0.1:57400:57400/tcp - 127.0.0.1:57400:57400/tcp
- 127.0.0.1:57401:22/tcp - 127.0.0.1:57401:22/tcp
vector:
extends:
file: versions.yml
service: vector
restart: "no"
profiles: [manual]
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./vector.yaml:/etc/vector/vector.yaml:ro
- ./vector.tests.yaml:/etc/vector/vector.tests.yaml:ro
environment:
VECTOR_CONFIG_DIR: /etc/vector

View File

@@ -7,12 +7,10 @@ services:
file: versions.yml file: versions.yml
service: grafana service: grafana
environment: environment:
GF_INSTALL_PLUGINS: marcusolsson-json-datasource 1.3.8
GF_SERVER_ROOT_URL: /grafana GF_SERVER_ROOT_URL: /grafana
GF_SERVER_SERVE_FROM_SUB_PATH: "true" GF_SERVER_SERVE_FROM_SUB_PATH: "true"
depends_on: depends_on:
- prometheus - prometheus
- akvorado-console
restart: unless-stopped restart: unless-stopped
volumes: volumes:
- akvorado-grafana:/var/lib/grafana - akvorado-grafana:/var/lib/grafana

View File

@@ -10,7 +10,7 @@ services:
restart: unless-stopped restart: unless-stopped
volumes: volumes:
- akvorado-loki:/loki - akvorado-loki:/loki
- ./loki.yaml:/etc/loki/local-config.yaml - ./loki.yaml:/etc/loki/local-config.yaml:ro
expose: expose:
- 3100/tcp - 3100/tcp
labels: labels:
@@ -19,11 +19,24 @@ services:
- traefik.http.routers.loki.entrypoints=private - traefik.http.routers.loki.entrypoints=private
- metrics.port=3100 - metrics.port=3100
alloy: vector:
extends: extends:
file: docker-compose-alloy.yml file: versions.yml
service: alloy service: vector
restart: unless-stopped
user: root # for access to /var/run/docker.sock
volumes: volumes:
- ./alloy/loki.alloy:/etc/alloy/loki.alloy - /var/run/docker.sock:/var/run/docker.sock:ro
- ./vector.yaml:/etc/vector/vector.yaml:ro
depends_on: depends_on:
- loki - loki
healthcheck:
interval: 20s
test: ["CMD",
"wget", "-T", "1", "--spider", "--url=http://127.0.0.1:8686/health"]
expose:
- 9598 # metrics
environment:
VECTOR_CONFIG_DIR: /etc/vector
labels:
- metrics.port=9598

View File

@@ -35,15 +35,33 @@ services:
# Fetch metrics # Fetch metrics
alloy: alloy:
extends: extends:
file: docker-compose-alloy.yml file: versions.yml
service: alloy service: alloy
restart: unless-stopped
user: root # for access to /var/run/docker.sock
volumes: volumes:
- ./alloy/prometheus.alloy:/etc/alloy/prometheus.alloy - /var/run/docker.sock:/var/run/docker.sock:ro
- ./config.alloy:/etc/alloy/config.alloy
extra_hosts:
- "host.docker.internal:host-gateway"
expose:
- 12345
command:
- run
- /etc/alloy
- --storage.path=/var/lib/alloy/data
- --server.http.listen-addr=0.0.0.0:12345
- --server.http.ui-path-prefix=/alloy
depends_on: depends_on:
prometheus: prometheus:
condition: service_healthy condition: service_healthy
kafka: kafka:
condition: service_healthy condition: service_healthy
labels:
- traefik.enable=true
- traefik.http.routers.alloy.rule=PathPrefix(`/alloy`)
- traefik.http.routers.alloy.entrypoints=private
- metrics.port=12345
# Node exporter for host metrics # Node exporter for host metrics
node-exporter: node-exporter:

393
docker/vector.tests.yaml Normal file
View File

@@ -0,0 +1,393 @@
---
# docker compose -f docker/docker-compose-dev.yml run --quiet --rm vector test
tests:
- name: "unknown application"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-something-unknown-1
label."com.docker.compose.service": something-unknown
message: >-
Hello world!
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Hello world!")
assert!(is_timestamp(.timestamp))
assert_eq!(._labels,
{"service_name": "something-unknown",
"instance": "akvorado-something-unknown-1"})
assert_eq!(._metadata, null)
- name: "akvorado logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-akvorado-conntrack-fixer-1
label."com.docker.compose.service": akvorado-conntrack-fixer-1
message: >-
{"level":"info",
"version":"v2.0.0-beta.4-66-g0ad0128fc6cd-dirty",
"time":"2025-08-29T15:01:02Z",
"caller":"akvorado/cmd/components.go:38",
"module":"akvorado/cmd",
"message":"akvorado has started"}
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "akvorado has started")
assert_eq!(.timestamp, t'2025-08-29T15:01:02Z')
assert_eq!(._labels,
{"service_name": "akvorado-conntrack-fixer",
"instance": "akvorado-akvorado-conntrack-fixer-1",
"level": "info",
"module": "akvorado/cmd"})
assert_eq!(._metadata,
{"caller": "akvorado/cmd/components.go:38",
"version": "v2.0.0-beta.4-66-g0ad0128fc6cd-dirty"})
- name: "kafka logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-kafka-1
label."com.docker.compose.service": kafka
message: |-
[2025-08-29 15:15:48,641] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "[BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)")
assert_eq!(.timestamp, t'2025-08-29T15:15:48.641Z')
assert_eq!(._labels,
{"service_name": "kafka",
"instance": "akvorado-kafka-1",
"level": "info"})
assert_eq!(._metadata, null)
- name: "kafka logs multiline"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-kafka-1
label."com.docker.compose.service": kafka
message: |-
[2025-08-29 15:15:48,605] INFO KafkaConfig values:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-kafka-1
label."com.docker.compose.service": kafka
message: |-
add.partitions.to.txn.retry.backoff.max.ms = 100
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-kafka-1
label."com.docker.compose.service": kafka
message: |-
add.partitions.to.txn.retry.backoff.ms = 20
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "KafkaConfig values:\n\
add.partitions.to.txn.retry.backoff.max.ms = 100\n\
add.partitions.to.txn.retry.backoff.ms = 20")
assert_eq!(.timestamp, t'2025-08-29T15:15:48.605Z')
assert_eq!(._labels,
{"service_name": "kafka",
"instance": "akvorado-kafka-1",
"level": "info"})
assert_eq!(._metadata, null)
- name: "redis logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-redis-1
label."com.docker.compose.service": redis
message: |-
1:C 28 Aug 2025 04:08:22.843 # Warning: no config file specified
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Warning: no config file specified")
assert_eq!(.timestamp, t'2025-08-28T04:08:22.843Z')
assert_eq!(._labels,
{"service_name": "redis",
"instance": "akvorado-redis-1",
"level": "warning",
"role": "RDB"})
assert_eq!(._metadata, {"pid": 1})
- name: "alloy logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-alloy-1
label."com.docker.compose.service": alloy
message: >-
ts=2025-08-28T09:30:45.497277819Z
level=info
msg="Scraped metadata watcher stopped"
component_path=/
component_id=prometheus.remote_write.default
subcomponent=rw
remote_name=0ffafb
url=http://prometheus:9090/prometheus/api/v1/write
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Scraped metadata watcher stopped")
assert_eq!(.timestamp, t'2025-08-28T09:30:45.497277819Z')
assert_eq!(._labels,
{"service_name": "alloy",
"instance": "akvorado-alloy-1",
"level": "info"})
assert_eq!(._metadata,
{"component_path": "/",
"component_id": "prometheus.remote_write.default",
"subcomponent": "rw",
"remote_name": "0ffafb",
"url": "http://prometheus:9090/prometheus/api/v1/write"})
- name: "loki logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-loki-1
label."com.docker.compose.service": loki
message: >-
ts=2025-08-29T05:07:45.543770684Z
caller=spanlogger.go:116
middleware=QueryShard.astMapperware
org_id=fake
traceID=0dd74c5aaeb81d32
user=fake
level=warn
msg="failed mapping AST"
err="context canceled"
query="{service_name=\"alloy\"}"
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "failed mapping AST: context canceled")
assert_eq!(.timestamp, t'2025-08-29T05:07:45.543770684Z')
assert_eq!(._labels,
{"service_name": "loki",
"instance": "akvorado-loki-1",
"level": "warning"})
assert_eq!(._metadata,
{"caller": "spanlogger.go:116",
"middleware": "QueryShard.astMapperware",
"org_id": "fake",
"traceID": "0dd74c5aaeb81d32",
"user": "fake",
"query": "{service_name=\"alloy\"}"})
- name: "grafana logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-grafana-1
label."com.docker.compose.service": grafana
message: >-
logger=provisioning.alerting
t=2025-08-29T21:05:35.215005098Z
level=error
msg="can't read alerting provisioning files from directory"
path=/etc/grafana/provisioning/alerting
error="open /etc/grafana/provisioning/alerting: no such file or directory"
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "can't read alerting provisioning files from directory: \
open /etc/grafana/provisioning/alerting: no such file or directory")
assert_eq!(.timestamp, t'2025-08-29T21:05:35.215005098Z')
assert_eq!(._labels,
{"service_name": "grafana",
"instance": "akvorado-grafana-1",
"level": "error"})
assert_eq!(._metadata,
{"logger": "provisioning.alerting",
"path": "/etc/grafana/provisioning/alerting"})
- name: "prometheus logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-prometheus-1
label."com.docker.compose.service": prometheus
message: >-
time=2025-08-29T21:34:41.191Z
level=INFO
source=manager.go:540
msg="Stopping notification manager..."
component=notifier
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Stopping notification manager...")
assert_eq!(.timestamp, t'2025-08-29T21:34:41.191Z')
assert_eq!(._labels,
{"service_name": "prometheus",
"instance": "akvorado-prometheus-1",
"level": "info"})
assert_eq!(._metadata,
{"source": "manager.go:540",
"component": "notifier"})
- name: "node-exporter logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-node-exporter-1
label."com.docker.compose.service": node-exporter
message: >-
time=2025-08-29T21:37:28.398Z
level=ERROR
source=diskstats_linux.go:264
msg="Failed to open directory, disabling udev device properties"
collector=diskstats
path=/run/udev/data
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Failed to open directory, disabling udev device properties")
assert_eq!(.timestamp, t'2025-08-29T21:37:28.398Z')
assert_eq!(._labels,
{"service_name": "node-exporter",
"instance": "akvorado-node-exporter-1",
"level": "error"})
assert_eq!(._metadata,
{"source": "diskstats_linux.go:264",
"collector": "diskstats",
"path": "/run/udev/data"})
- name: cadvidsor logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-cadvisor-1
label."com.docker.compose.service": cadvisor
message: >-
I0829 21:38:18.192196 1 factory.go:352] Registering Docker factory
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "Registering Docker factory")
assert!(is_timestamp(.timestamp))
assert_eq!(._labels,
{"service_name": "cadvisor",
"instance": "akvorado-cadvisor-1",
"level": "info"})
assert_eq!(._metadata, {"pid": 1, "caller": "factory.go:352"})
- name: "traefik access logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-traefik-1
label."com.docker.compose.service": traefik
message: >-
240.0.2.1
-
-
[29/Aug/2025:20:40:35 +0000]
"GET /api/v0/console/widget/flow-rate?11334 HTTP/1.0"
200
46
"-"
"-"
1596365
"akvorado-console@docker"
"http://240.0.2.10:8080"
3ms
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "GET /api/v0/console/widget/flow-rate?11334 HTTP/1.0")
assert_eq!(.timestamp, t'2025-08-29T20:40:35Z')
assert_eq!(._labels,
{"service_name": "traefik",
"instance": "akvorado-traefik-1",
"status": 200})
assert_eq!(._metadata,
{"backend_url": "http://240.0.2.10:8080",
"body_bytes_sent": 46,
"duration_ms": 3,
"frontend_name": "akvorado-console@docker",
"remote_addr": "240.0.2.1",
"request_count": 1596365})
- name: "traefik logs"
inputs:
- insert_at: base
type: log
log_fields:
container_id: b8ee56469
container_name: akvorado-traefik-1
label."com.docker.compose.service": traefik
message: >-
2025-08-29T19:17:05Z ERR error="accept tcp [::]:8081: use of closed network connection" entryPointName=public
outputs:
- extract_from: combine
conditions:
- type: vrl
source: |-
assert_eq!(.message, "accept tcp [::]:8081: use of closed network connection")
assert_eq!(.timestamp, t'2025-08-29T19:17:05Z')
assert_eq!(._labels,
{"service_name": "traefik",
"instance": "akvorado-traefik-1",
"level": "error"})
assert_eq!(._metadata,
{"entryPointName": "public"})

256
docker/vector.yaml Normal file
View File

@@ -0,0 +1,256 @@
---
api:
enabled: true
address: 0.0.0.0:8686
sources:
internal_metrics:
type: internal_metrics
scrape_interval_secs: 10
internal_logs:
type: internal_logs
docker:
type: docker_logs
include_labels:
- "com.docker.compose.project=akvorado"
transforms:
base:
type: remap
inputs:
- docker
source: |
.service_name = replace(string!(.label."com.docker.compose.service"), r'(.+?)(?:-\d+)?', "$$1")
._labels.service_name = .service_name
._labels.instance = .container_name
routes:
type: route
inputs:
- base
route:
akvorado: 'starts_with(string!(.service_name), "akvorado-")'
kafka: '.service_name == "kafka"'
redis: '.service_name == "redis"'
alloy: '.service_name == "alloy"'
loki: '.service_name == "loki"'
grafana: '.service_name == "grafana"'
prometheus: '.service_name == "prometheus"'
nodeexporter: '.service_name == "node-exporter"'
cadvisor: '.service_name == "cadvisor"'
traefik: '.service_name == "traefik"'
from_akvorado:
type: remap
inputs:
- routes.akvorado
source: |
parsed = parse_json!(.message)
.timestamp = parse_timestamp!(parsed.time, format: "%+")
.message = parsed.message
._labels.level = parsed.level
._labels.module = parsed.module
._metadata = parsed
del(._metadata.message)
del(._metadata.time)
del(._metadata.level)
del(._metadata.module)
from_kafka_multiline:
type: reduce
inputs:
- routes.kafka
group_by:
- .container_id
starts_when: |
match(string!(.message), r'^\[\d{4}-\d{2}-\d{2} ')
expire_after_ms: 1000
merge_strategies:
message: concat_newline
from_kafka:
type: remap
inputs:
- from_kafka_multiline
source: |
parsed = parse_regex!(string!(.message),
r'^\[(?P<timestamp>[^\]]+)\]\s+(?P<level>\w+)\s+(?P<message>(?s:.*))$$')
.timestamp = parse_timestamp!(parsed.timestamp, format: "%Y-%m-%d %H:%M:%S,%3f")
.message = parsed.message
._labels.level = parsed.level
from_redis:
type: remap
inputs:
- routes.redis
source: |
parsed = parse_regex!(string!(.message), r'(?x)
^(?P<pid>\d+):
(?P<role>[XCSM])\s+
(?P<timestamp>\d+\s+\w+\s+\d{4}\s+\d{2}:\d{2}:\d{2}\.\d{3})\s+
(?P<level>[*\#.-])\s+
(?P<message>.*)$$')
.timestamp = parse_timestamp!(parsed.timestamp, format: "%e %b %Y %H:%M:%S%.3f")
.message = parsed.message
._labels.role = if parsed.role == "X" { "sentinel" } else if parsed.role == "C" { "RDB" } else if parsed.role == "S" { "slave" } else { "master" }
._labels.level = if parsed.level == "." { "debug" } else if parsed.level == "-" { "info" } else if parsed.level == "*" { "notice" } else { "warning" }
._metadata.pid = to_int!(parsed.pid)
from_logfmt:
type: remap
inputs:
- routes.alloy
- routes.loki
- routes.grafana
- routes.prometheus
- routes.nodeexporter
source: |
parsed = parse_logfmt!(.message)
.timestamp = parse_timestamp!(parsed.ts || parsed.t || parsed.time, format: "%+")
.message = join!(unique(compact(
[parsed.msg || parsed.message || parsed.error || parsed.err,
parsed.err || parsed.error], recursive: false)), separator: ": ")
._labels.level = parsed.level
._metadata = parsed
del(._metadata.ts)
del(._metadata.t)
del(._metadata.time)
del(._metadata.msg)
del(._metadata.message)
del(._metadata.level)
del(._metadata.err)
del(._metadata.error)
from_vector:
type: remap
inputs:
- internal_logs
source: |
._labels.service_name = "vector"
._labels.instance = .host
._metadata = .metadata
._metadata.pid = .pid
from_cadvisor:
type: remap
inputs:
- routes.cadvisor
source: |
parsed = parse_regex!(string!(.message), r'(?x)
^(?P<level>[IWEF])
(?P<timestamp>\d{4}\s\d{2}:\d{2}:\d{2}\.\d+)\s+
(?P<pid>\d+)\s+
(?P<caller>[^]]+)\]\s+
(?P<message>.*)$$')
# Timestamp is missing the year
# .timestamp = parse_timestamp!(parsed.timestamp, format: "%m%d %H:%M:%S%.6f")
.message = parsed.message
._labels.level = if parsed.level == "I" { "info" } else if parsed.level == "W" { "warning" } else if parsed.level == "E" { "error" } else { "fatal" }
._metadata.pid = to_int!(parsed.pid)
._metadata.caller = parsed.caller
from_traefik:
type: remap
inputs:
- routes.traefik
source: |
parsed, err = parse_regex(.message, r'(?x)
^(?P<remote_addr>\S+)\s
-\s
(?P<remote_user>\S+)\s
\[(?P<timestamp>[^\]]+)\]\s
"(?P<method>\S+)\s(?P<path>\S+)\s(?P<protocol>[^"]+)"\s
(?P<status>\d+)\s
(?P<body_bytes_sent>\d+)\s
"(?P<http_referer>[^"]*)"\s
"(?P<http_user_agent>[^"]*)"\s
(?P<request_count>\d+)\s
"(?P<frontend_name>[^"]*)"\s
"(?P<backend_url>[^"]*)"\s
(?P<duration_ms>\d+)ms$$')
if err == null {
.timestamp = parse_timestamp!(parsed.timestamp, "%d/%b/%Y:%H:%M:%S %z")
.message = join!([parsed.method, parsed.path, parsed.protocol], " ")
._labels.status = to_int!(parsed.status)
del(parsed.timestamp)
del(parsed.method)
del(parsed.path)
del(parsed.protocol)
del(parsed.status)
parsed.body_bytes_sent = to_int!(parsed.body_bytes_sent)
parsed.request_count = to_int!(parsed.request_count)
parsed.duration_ms = to_int!(parsed.duration_ms)
parsed = filter(parsed) -> |key, val| {
val != "-"
}
._metadata = parsed
} else {
parsed, err = parse_regex(.message, r'(?x)
^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\S+)\s
(?P<level>\S+)\s
(?P<remaining>.*)$$')
if err == null {
.timestamp = parse_timestamp!(parsed.timestamp, "%+")
._labels.level = parsed.level
parsed = parse_logfmt!(parsed.remaining)
.message = parsed.msg || parsed.message || parsed.error
._metadata = parsed
del(._metadata.msg)
del(._metadata.message)
del(._metadata.error)
}
}
combine:
type: remap
inputs:
- from_akvorado
- from_kafka
- from_redis
- from_logfmt
- from_grafana
- from_vector
- from_cadvisor
- from_traefik
- routes._unmatched
source: |
if exists(._labels.level) {
level = downcase!(._labels.level)
if starts_with(level, "em") {
level = "critical"
} else if starts_with(level, "al") {
level = "critical"
} else if starts_with(level, "cr") {
level = "critical"
} else if starts_with(level, "er") {
level = "error"
} else if starts_with(level, "wa") {
level = "warning"
} else if starts_with(level, "in") {
level = "info"
} else if starts_with(level, "no") {
level = "info"
} else if starts_with(level, "de") {
level = "debug"
} else if starts_with(level, "db") {
level = "debug"
} else if starts_with(level, "tr") {
level = "trace"
}
._labels.level = level
}
sinks:
prometheus:
type: prometheus_exporter
inputs:
- internal_metrics
loki:
type: loki
inputs:
- combine
endpoint: http://loki:3100/loki
encoding:
codec: "text"
labels:
"*": "{{ ._labels }}"
structured_metadata:
"*": "{{ ._metadata }}"

View File

@@ -36,6 +36,8 @@ services:
image: ghcr.io/google/cadvisor:v0.53.0 # v\d+\.\d+\.\d+ image: ghcr.io/google/cadvisor:v0.53.0 # v\d+\.\d+\.\d+
kafka-ui: kafka-ui:
image: ghcr.io/kafbat/kafka-ui:v1.3.0 # v\d+\.\d+\.\d+ image: ghcr.io/kafbat/kafka-ui:v1.3.0 # v\d+\.\d+\.\d+
vector:
image: timberio/vector:0.49.0-alpine # \d+\.\d+\.\d+-alpine
# for tests # for tests
srlinux: srlinux: