Commit Graph

21 Commits

Author SHA1 Message Date
Vincent Bernat
756e4a8fbd */kafka: switch to franz-go
The concurrency of this library is easier to handle than Sarama.
Notably, it is more compatible with the new model of "almost share
nothing" we use for the inlet and the outlet. The lock for workers in
outlet is removed. We can now use sync.Pool to allocate slice of bytes
in inlet.

It may also be more performant.

In the future, we may want to commit only when pushing data to
ClickHouse. However, this does not seem easy when there is a rebalance.
In case of rebalance, we need to do something when a partition is
revoked to avoid duplicating data. For example, we could flush the
current batch to ClickHouse. Have a look at the
`example/mark_offsets/main.go` file in franz-go repository for a
possible approach. In the meantime, we rely on autocommit.

Another contender could be https://github.com/segmentio/kafka-go. Also
see https://github.com/twmb/franz-go/pull/1064.
2025-07-27 21:44:28 +02:00
Vincent Bernat
ac68c5970e inlet: split inlet into new inlet and outlet
This change split the inlet component into a simpler inlet and a new
outlet component. The new inlet component receive flows and put them in
Kafka, unparsed. The outlet component takes them from Kafka and resume
the processing from here (flow parsing, enrichment) and puts them in
ClickHouse.

The main goal is to ensure the inlet does a minimal work to not be late
when processing packets (and restart faster). It also brings some
simplification as the number of knobs to tune everything is reduced: for
inlet, we only need to tune the queue size for UDP, the number of
workers and a few Kafka parameters; for outlet, we need to tune a few
Kafka parameters, the number of workers and a few ClickHouse parameters.

The outlet component features a simple Kafka input component. The core
component becomes just a callback function. There is also a new
ClickHouse component to push data to ClickHouse using the low-level
ch-go library with batch inserts.

This processing has an impact on the internal representation of a
FlowMessage. Previously, it was tailored to dynamically build the
protobuf message to be put in Kafka. Now, it builds the batch request to
be sent to ClickHouse. This makes the FlowMessage structure hides the
content of the next batch request and therefore, it should be reused.
This also changes the way we decode flows as they don't output
FlowMessage anymore, they reuse one that is provided to each worker.

The ClickHouse tables are slightly updated. Instead of using Kafka
engine, the Null engine is used instead.

Fix #1122
2025-07-27 21:44:28 +02:00
Vincent Bernat
a449736a62 build: use Go 1.22 range over ints
Done with:

```
git grep -l 'for.*:= 0.*++' \
  | xargs sed -i -E 's/for (.*) := 0; \1 < (.*); \1\+\+/for \1 := range \2/'
```

And a few manual fixes due to unused variables. There is something fishy
in BMP rib test. Add a comment about that. This is not equivalent (as
with range, random is evaluated once, while in the original loop, it is
evaluated at each iteration). I believe the intent was to behave like
with range.
2024-08-14 10:11:35 +02:00
Vincent Bernat
8ecc6b9570 inlet: add some test coverage around MPLS parsing 2023-11-25 20:34:45 +01:00
Vincent Bernat
436e6b7f43 inlet/flow: rename pcap in netflow/testdata 2023-09-16 22:37:33 +02:00
Vincent Bernat
a912da7fa1 build: use gofumpt
Undecided if we need to use it. I think it's nice.
2023-02-11 10:03:45 +01:00
Vincent Bernat
e352202631 inlet: make use of schema for inlet
This is a huge change to make the various subcomponents of the inlet use
the schema to generate the protobuf. For it to make sense, we also
modify the way we parse flows to directly serialize non-essential fields
to Protobuf.

The performance is mostly on par with the previous commit. We are a bit
less efficient because we don't have a fixed structure, but we avoid
loosing too much performance by not relying on reflection and keeping
the production of messages as code. We use less of Goflow2: raw flow
parsing is still done by Goflow2, but we don't use the producer part
anymore. This helps a bit with the performance as we parse less.
Overall, we are 20% than the previous commit and twice faster than the
1.6.4!

```
goos: linux
goarch: amd64
pkg: akvorado/inlet/flow
cpu: AMD Ryzen 5 5600X 6-Core Processor
BenchmarkDecodeEncodeNetflow
BenchmarkDecodeEncodeNetflow/with_encoding
BenchmarkDecodeEncodeNetflow/with_encoding-12             151484              7789 ns/op            8272 B/op        143 allocs/op
BenchmarkDecodeEncodeNetflow/without_encoding
BenchmarkDecodeEncodeNetflow/without_encoding-12          162550              7133 ns/op            8272 B/op        143 allocs/op
BenchmarkDecodeEncodeSflow
BenchmarkDecodeEncodeSflow/with_encoding
BenchmarkDecodeEncodeSflow/with_encoding-12                94844             13193 ns/op            9816 B/op        295 allocs/op
BenchmarkDecodeEncodeSflow/without_encoding
BenchmarkDecodeEncodeSflow/without_encoding-12             92569             12456 ns/op            9816 B/op        295 allocs/op
```

There was a tentative to parse sFlow packets with gopackets, but the
adhoc parser used here is more performant.
2023-01-17 20:53:00 +01:00
Vincent Bernat
f3f7e0fdc8 inlet/flow: increase a bit how much to wait for flows 2022-12-06 08:14:32 +01:00
Vincent Bernat
2f39a34263 inlet/flow: restrict rate-limit test to Linux
It often fails on MacOS on GitHub
2022-10-24 23:56:59 +02:00
Vincent Bernat
f8a795282e inlet/flow: replace raw data test files with PCAP files
Raw data files can be converted with Scapy:

```python
from scapy.all import *
wrpcap("data-1140.pcap",
  Ether(src="00:53:00:11:22:33",dst="00:53:00:44:55:66")/
  IP(src="192.0.2.100", dst="192.0.2.101")/
  UDP(sport=47873,dport=6343)/
  open("data-1140.data", "rb").read())
```
2022-09-09 13:55:09 +02:00
Vincent Bernat
9b7891429b inlet/flow: fix retry logic for sampling rate test 2022-08-30 14:59:04 +02:00
Vincent Bernat
a8d2d88df4 inlet/flow: for sampling rate test, estimate again on retry 2022-08-30 08:34:12 +02:00
Vincent Bernat
ff5d50db43 inlet/flow: retry rate limiting test a few times 2022-08-30 08:02:06 +02:00
Vincent Bernat
41744ff7ad inlet/flow: be more lenient on the test about sampling rate 2022-08-28 00:11:04 +02:00
Vincent Bernat
8552465bac inlet/flow: make "max flows/s" test more lenient 2022-08-24 10:10:37 +02:00
Vincent Bernat
506c15800a inlet/flow: add a per-exporter flow rate-limiter
This is useful if we cannot tune the sampling rate of the source
equipment and it is too high for us. The sampling rate is adapted.
This is difficult to test, so hopefully, this is correct!
2022-08-23 20:47:19 +02:00
Vincent Bernat
844396a9d9 inlet/flow: remove unused code in test
We do not test received flows here because flow parsing testing is
done otherwhere.
2022-08-23 08:32:03 +02:00
Vincent Bernat
8be1bca4fd license: AGPL-3.0-only
```
git ls-files \*.js \*.go \
  | xargs sed -i '1i // SPDX-FileCopyrightText: 2022 Free Mobile\n// SPDX-License-Identifier: AGPL-3.0-only\n'
git ls-files \*.vue \
  | xargs sed -i '1i <!-- SPDX-FileCopyrightText: 2022 Free Mobile -->\n<!-- SPDX-License-Identifier: AGPL-3.0-only -->\n'
```
2022-06-29 11:42:28 +02:00
Vincent Bernat
ce9bd6a4da tests: add an helper to start/stop components 2022-04-13 17:02:14 +02:00
Vincent Bernat
93da599adf cmd: take configuration as a mandatory argument (+ other changes)
The other changes are:
 - rename configure service to orchestrator service
 - turn DefaultConfiguration variables into functions
2022-04-10 15:14:39 +02:00
Vincent Bernat
1dc253764d global: split Akvorado into 3 services 2022-04-01 20:21:53 +02:00