flow: add IPFIX support

This commit is contained in:
Vincent Bernat
2022-03-26 21:23:58 +01:00
parent 1b4ba973a7
commit b086c9478a
5 changed files with 64 additions and 52 deletions

View File

@@ -1,6 +1,6 @@
# Akvorado: flow collector, hydrater and exporter.
This program receives flows (currently Netflow), hydrates them with
This program receives flows (currently Netflow/IPFIX), hydrates them with
interface names (using SNMP), geo information (using MaxMind), and
exports them to Kafka.

View File

@@ -65,7 +65,7 @@ var ServeOptions serveOptions
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start akvorado",
Long: `Akvorado is a Netflow collector. It hydrates flows with information from SNMP and GeoIP
Long: `Akvorado is a Netflow/IPFIX collector. It hydrates flows with information from SNMP and GeoIP
and exports them to Kafka.`,
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {

View File

@@ -83,8 +83,8 @@ design to scale is a bit different as *Akvorado* will create a socket
for each worker instead of distributing incoming flows using message
passing.
Only Netflow v9 is currently handled. However, as *GoFlow2* also
supports sFlow and IPFIX, support for them can be added later.
Only Netflow v9 and IPFIX are currently handled. However, as *GoFlow2*
also supports sFlow, support can be added later.
The design of this component is modular as it is possible to "plug"
new decoders and new inputs easily. It is expected that most buffering

View File

@@ -1,4 +1,4 @@
// Package netflow handles NetFlow v9 decoding.
// Package netflow handles NetFlow v9 and IPFIX decoding.
package netflow
import (
@@ -7,7 +7,6 @@ import (
"sync"
"github.com/netsampler/goflow2/decoders/netflow"
goflowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"akvorado/flow/decoder"
@@ -98,13 +97,17 @@ type templateSystem struct {
func (s *templateSystem) AddTemplate(version uint16, obsDomainID uint32, template interface{}) {
s.templates.AddTemplate(version, obsDomainID, template)
typeStr := "options_template"
var templateID uint16
var (
templateID uint16
typeStr string
)
switch templateIDConv := template.(type) {
case netflow.IPFIXOptionsTemplateRecord:
templateID = templateIDConv.TemplateId
typeStr = "options_template"
case netflow.NFv9OptionsTemplateRecord:
templateID = templateIDConv.TemplateId
typeStr = "options_template"
case netflow.TemplateRecord:
templateID = templateIDConv.TemplateId
typeStr = "template"
@@ -116,8 +119,7 @@ func (s *templateSystem) AddTemplate(version uint16, obsDomainID uint32, templat
strconv.Itoa(int(obsDomainID)),
strconv.Itoa(int(templateID)),
typeStr,
).
Inc()
).Inc()
}
func (s *templateSystem) GetTemplate(version uint16, obsDomainID uint32, templateID uint16) (interface{}, error) {
@@ -157,60 +159,70 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*decoder.FlowMessage {
if err != nil {
switch err.(type) {
case *netflow.ErrorTemplateNotFound:
nd.metrics.errors.WithLabelValues(key, "template_not_found").
Inc()
nd.metrics.errors.WithLabelValues(key, "template_not_found").Inc()
default:
nd.metrics.errors.WithLabelValues(key, "error_decoding").
Inc()
nd.metrics.errors.WithLabelValues(key, "error_decoding").Inc()
}
return nil
}
var flowMessageSet []*goflowmessage.FlowMessage
var (
version string
flowSets []interface{}
)
// Update some stats
switch msgDecConv := msgDec.(type) {
case netflow.IPFIXPacket:
version = "10"
flowSets = msgDecConv.FlowSets
case netflow.NFv9Packet:
nd.metrics.stats.WithLabelValues(key, "9").
Inc()
for _, fs := range msgDecConv.FlowSets {
switch fsConv := fs.(type) {
case netflow.TemplateFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, "9", "TemplateFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, "9", "TemplateFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.NFv9OptionsTemplateFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, "9", "OptionsTemplateFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, "9", "OptionsTemplateFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.OptionsDataFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, "9", "OptionsDataFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, "9", "OptionsDataFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.DataFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, "9", "DataFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, "9", "DataFlowSet").
Add(float64(len(fsConv.Records)))
}
}
flowMessageSet, err = producer.ProcessMessageNetFlow(msgDecConv, sampling)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = in.Source
timeDiff := fmsg.TimeReceived - fmsg.TimeFlowEnd
nd.metrics.timeStatsSum.WithLabelValues(key, "9").
Observe(float64(timeDiff))
}
version = "9"
flowSets = msgDecConv.FlowSets
default:
nd.metrics.stats.WithLabelValues(key, "unknown").
Inc()
return nil
}
nd.metrics.stats.WithLabelValues(key, version).Inc()
for _, fs := range flowSets {
switch fsConv := fs.(type) {
case netflow.TemplateFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, version, "TemplateFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, version, "TemplateFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.IPFIXOptionsTemplateFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, version, "OptionsTemplateFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, version, "OptionsTemplateFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.NFv9OptionsTemplateFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, version, "OptionsTemplateFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, version, "OptionsTemplateFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.OptionsDataFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, version, "OptionsDataFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, version, "OptionsDataFlowSet").
Add(float64(len(fsConv.Records)))
case netflow.DataFlowSet:
nd.metrics.setStatsSum.WithLabelValues(key, version, "DataFlowSet").
Inc()
nd.metrics.setRecordsStatsSum.WithLabelValues(key, version, "DataFlowSet").
Add(float64(len(fsConv.Records)))
}
}
flowMessageSet, err := producer.ProcessMessageNetFlow(msgDec, sampling)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = in.Source
timeDiff := fmsg.TimeReceived - fmsg.TimeFlowEnd
nd.metrics.timeStatsSum.WithLabelValues(key, version).
Observe(float64(timeDiff))
}
results := make([]*decoder.FlowMessage, len(flowMessageSet))
for idx, fmsg := range flowMessageSet {

View File

@@ -1,4 +1,4 @@
// Package flow handle incoming flows (currently Netflow v9).
// Package flow handle incoming flows (currently Netflow v9 and IPFIX).
package flow
import (