core: move augmentation function outside of root.go

Also enrich → augment. Not sure. Or hydrate?
This commit is contained in:
Vincent Bernat
2022-03-18 07:47:49 +01:00
parent ad14826f6f
commit 7c5a7bdbef
5 changed files with 73 additions and 55 deletions

View File

@@ -1,6 +1,6 @@
# Akvorado: flow collector, enricher and exporter.
# Akvorado: flow collector, augmenter and exporter.
This program receives flows (currently Netflow), enriches them with
This program receives flows (currently Netflow), augments them with
interface names (using SNMP), geo information (using MaxMind), and
exports them to Kafka.

View File

@@ -59,7 +59,7 @@ var ServeOptions serveOptions
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start akvorado",
Long: `Akvorado is a Netflow collector. It enriches flows with information from SNMP and GeoIP
Long: `Akvorado is a Netflow collector. It augments flows with information from SNMP and GeoIP
and exports them to Kafka.`,
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {

68
core/augment.go Normal file
View File

@@ -0,0 +1,68 @@
package core
import (
"net"
"time"
"golang.org/x/time/rate"
"akvorado/flow"
"akvorado/snmp"
)
// AugmentFlow adds more data to a flow.
func (c *Component) AugmentFlow(sampler string, flow *flow.FlowMessage) (skip bool) {
errLimiter := rate.NewLimiter(rate.Every(time.Minute), 10)
if flow.InIf != 0 {
samplerName, iface, err := c.d.Snmp.Lookup(sampler, uint(flow.InIf))
if err != nil {
if err != snmp.ErrCacheMiss && errLimiter.Allow() {
c.r.Err(err).Str("sampler", sampler).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(sampler, err.Error()).Inc()
skip = true
} else {
flow.SamplerName = samplerName
flow.InIfName = iface.Name
flow.InIfDescription = iface.Description
flow.InIfSpeed = uint32(iface.Speed)
}
}
if flow.OutIf != 0 {
samplerName, iface, err := c.d.Snmp.Lookup(sampler, uint(flow.OutIf))
if err != nil {
// Only register a cache miss if we don't have one.
// TODO: maybe we could do one SNMP query for both interfaces.
if !skip {
if err != snmp.ErrCacheMiss && errLimiter.Allow() {
c.r.Err(err).Str("sampler", sampler).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(sampler, err.Error()).Inc()
skip = true
}
} else {
flow.SamplerName = samplerName
flow.OutIfName = iface.Name
flow.OutIfDescription = iface.Description
flow.OutIfSpeed = uint32(iface.Speed)
}
}
if flow.SamplingRate == 0 {
c.metrics.flowsErrors.WithLabelValues(sampler, "sampling rate missing").Inc()
skip = true
}
if skip {
return
}
// Add GeoIP
if flow.SrcAS == 0 {
flow.SrcAS = c.d.GeoIP.LookupASN(net.IP(flow.SrcAddr))
}
if flow.DstAS == 0 {
flow.DstAS = c.d.GeoIP.LookupASN(net.IP(flow.DstAddr))
}
flow.SrcCountry = c.d.GeoIP.LookupCountry(net.IP(flow.SrcAddr))
flow.DstCountry = c.d.GeoIP.LookupCountry(net.IP(flow.DstAddr))
return
}

View File

@@ -98,60 +98,10 @@ func (c *Component) runWorker(workerID int) error {
sampler := net.IP(flow.SamplerAddress).String()
c.metrics.flowsReceived.WithLabelValues(sampler).Inc()
// Add interface and sampler names
skip := false
if flow.InIf != 0 {
samplerName, iface, err := c.d.Snmp.Lookup(sampler, uint(flow.InIf))
if err != nil {
if err != snmp.ErrCacheMiss && errLimiter.Allow() {
c.r.Err(err).Str("sampler", sampler).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(sampler, err.Error()).Inc()
skip = true
} else {
flow.SamplerName = samplerName
flow.InIfName = iface.Name
flow.InIfDescription = iface.Description
flow.InIfSpeed = uint32(iface.Speed)
}
}
if flow.OutIf != 0 {
samplerName, iface, err := c.d.Snmp.Lookup(sampler, uint(flow.OutIf))
if err != nil {
// Only register a cache miss if we don't have one.
// TODO: maybe we could do one SNMP query for both interfaces.
if !skip {
if err != snmp.ErrCacheMiss && errLimiter.Allow() {
c.r.Err(err).Str("sampler", sampler).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(sampler, err.Error()).Inc()
skip = true
}
} else {
flow.SamplerName = samplerName
flow.OutIfName = iface.Name
flow.OutIfDescription = iface.Description
flow.OutIfSpeed = uint32(iface.Speed)
}
}
if flow.SamplingRate == 0 {
c.metrics.flowsErrors.WithLabelValues(sampler, "sampling rate missing").Inc()
skip = true
}
if skip {
if skip := c.AugmentFlow(sampler, flow); skip {
continue
}
// Add GeoIP
if flow.SrcAS == 0 {
flow.SrcAS = c.d.GeoIP.LookupASN(net.IP(flow.SrcAddr))
}
if flow.DstAS == 0 {
flow.DstAS = c.d.GeoIP.LookupASN(net.IP(flow.DstAddr))
}
flow.SrcCountry = c.d.GeoIP.LookupCountry(net.IP(flow.SrcAddr))
flow.DstCountry = c.d.GeoIP.LookupCountry(net.IP(flow.DstAddr))
// Serialize flow (use length-prefixed protobuf)
buf := proto.NewBuffer([]byte{})
err := buf.EncodeMessage(flow)

View File

@@ -8,7 +8,7 @@ hide:
![](assets/akvorado.svg){ .akvorado-logo }
*Akvorado* is a flow collector, enricher and exporter. It receives
*Akvorado* is a flow collector, augmenter and exporter. It receives
flows, adds some data like interface names and geo information, and
exports them to Kafka. [Akvorado][] means "water wheel" in Esperanto.