common/schema: add ability to reverse flow direction

This commit is contained in:
Vincent Bernat
2025-09-28 22:28:35 +02:00
parent b05706e92a
commit a1e29071a9
4 changed files with 76 additions and 8 deletions

View File

@@ -143,6 +143,7 @@ func (schema Schema) ClickHouseHash() string {
// AppendDateTime adds a DateTime value to the provided column
func (bf *FlowMessage) AppendDateTime(columnKey ColumnKey, value uint32) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if value == 0 || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -154,6 +155,7 @@ func (bf *FlowMessage) AppendDateTime(columnKey ColumnKey, value uint32) {
// AppendUint adds an UInt64/32/16/8 or Enum8 value to the provided column
func (bf *FlowMessage) AppendUint(columnKey ColumnKey, value uint64) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if value == 0 || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -182,6 +184,7 @@ func (bf *FlowMessage) AppendUint(columnKey ColumnKey, value uint64) {
// AppendString adds a String value to the provided column
func (bf *FlowMessage) AppendString(columnKey ColumnKey, value string) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if value == "" || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -198,6 +201,7 @@ func (bf *FlowMessage) AppendString(columnKey ColumnKey, value string) {
// AppendIPv6 adds an IPv6 value to the provided column
func (bf *FlowMessage) AppendIPv6(columnKey ColumnKey, value netip.Addr) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if !value.IsValid() || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -216,6 +220,7 @@ func (bf *FlowMessage) AppendIPv6(columnKey ColumnKey, value netip.Addr) {
// AppendArrayUInt32 adds an Array(UInt32) value to the provided column
func (bf *FlowMessage) AppendArrayUInt32(columnKey ColumnKey, value []uint32) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if len(value) == 0 || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -227,6 +232,7 @@ func (bf *FlowMessage) AppendArrayUInt32(columnKey ColumnKey, value []uint32) {
// AppendArrayUInt128 adds an Array(UInt128) value to the provided column
func (bf *FlowMessage) AppendArrayUInt128(columnKey ColumnKey, value []UInt128) {
columnKey = reverse(bf, columnKey)
col := bf.batch.columns[columnKey]
if len(value) == 0 || col == nil || bf.batch.columnSet.Test(uint(columnKey)) {
return
@@ -388,3 +394,10 @@ func (bf *FlowMessage) Finalize() {
bf.reset()
bf.check()
}
func reverse(bf *FlowMessage, columnKey ColumnKey) ColumnKey {
if !bf.reversed {
return columnKey
}
return columnReverseTable[columnKey]
}

View File

@@ -618,3 +618,36 @@ func TestBuildProtoInput(t *testing.T) {
t.Fatalf("ClickHouseProtoInput() (-got, +want):\n%s", diff)
}
}
func TestReverse(t *testing.T) {
c := NewMock(t)
bf := c.NewFlowMessage()
bf.Reverse()
bf.AppendDateTime(ColumnTimeReceived, 1000)
bf.AppendUint(ColumnSamplingRate, 20000)
bf.AppendString(ColumnInIfName, "input")
bf.AppendString(ColumnOutIfName, "output")
bf.AppendArrayUInt32(ColumnDstCommunities, []uint32{10, 11, 12})
bf.SrcAS = 65000
bf.DstAS = 65002
bf.Reverse()
expected := map[ColumnKey]any{
ColumnTimeReceived: uint32(1000),
ColumnSamplingRate: uint64(20000),
ColumnInIfName: "output",
ColumnOutIfName: "input",
}
got := bf.OtherColumns
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Append() (-got, +want):\n%s", diff)
}
if bf.SrcAS != 65002 {
t.Errorf("SrcAS == %d, should be %d", bf.SrcAS, 65002)
}
if bf.DstAS != 65000 {
t.Errorf("DstAS == %d, should be %d", bf.DstAS, 65000)
}
}

View File

@@ -12,12 +12,23 @@ package schema
import "akvorado/common/helpers/bimap"
var columnNameMap = bimap.New(map[ColumnKey]string{
$(grep -oEw 'Column[a-zA-Z0-9]+' common/schema/definition.go \
| sed 's/^Column//' \
| grep -vFx Last \
| grep -vFx Key \
| grep -v '^Group' \
| sort | uniq \
$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \
| head -n-1 \
| sed -En 's/^\s+Column//p' \
| awk '{ print "Column"$1": \""$1"\","}')
})
var columnReverseTable = [...]ColumnKey{
$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \
| head -n-1 \
| sed -En 's/^\s+(Column\w+).*/\1/p' \
| sed -E \
-e 's/(ColumnDst(ASPath|Communities|LargeCommunities|[123]..AS))/\1: 0,/;t' \
-e 's/(ColumnIn)([A-Z0-9].*)/ColumnOut\2: \1\2,/;t' \
-e 's/(ColumnOut)([A-Z0-9].*)/ColumnIn\2: \1\2,/;t' \
-e 's/(ColumnSrc)([A-Z0-9].*)/ColumnDst\2: \1\2,/;t' \
-e 's/(ColumnDst)([A-Z0-9].*)/ColumnSrc\2: \1\2,/;t' \
-e 's/(.*)/\1: \1,/')
}
EOF

View File

@@ -38,8 +38,9 @@ type FlowMessage struct {
// Only for tests
OtherColumns map[ColumnKey]any
batch clickhouseBatch
schema *Schema
reversed bool
batch clickhouseBatch
schema *Schema
}
// clickhouseBatch stores columns for efficient streaming. It is embedded
@@ -108,3 +109,13 @@ func (schema *Schema) NewFlowMessage() *FlowMessage {
func (bf *FlowMessage) FlowCount() int {
return bf.batch.rowCount
}
// Reverse reverses the direction of the next calls to Append*().
func (bf *FlowMessage) Reverse() {
bf.reversed = !bf.reversed
bf.InIf, bf.OutIf = bf.OutIf, bf.InIf
bf.SrcVlan, bf.DstVlan = bf.DstVlan, bf.SrcVlan
bf.SrcAddr, bf.DstAddr = bf.DstAddr, bf.SrcAddr
bf.SrcAS, bf.DstAS = bf.DstAS, bf.SrcAS
bf.SrcNetMask, bf.DstNetMask = bf.DstNetMask, bf.SrcNetMask
}