console: also use dimensions to tell if we need to use main table

This commit is contained in:
Vincent Bernat
2022-08-08 11:35:18 +02:00
parent b745cfd356
commit b5e48e8cfd
4 changed files with 69 additions and 23 deletions

View File

@@ -150,7 +150,7 @@ func (c *Component) graphHandlerFunc(gc *gin.Context) {
if resolution < time.Second {
resolution = time.Second
}
sqlQuery = c.queryFlowsTable(sqlQuery, input.Filter.MainTableRequired,
sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter),
input.Start, input.End, resolution)
gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " "))

View File

@@ -14,41 +14,62 @@ import (
type queryColumn int
func (gc queryColumn) MarshalText() ([]byte, error) {
got, ok := queryColumnMap.LoadValue(gc)
func (qc queryColumn) MarshalText() ([]byte, error) {
got, ok := queryColumnMap.LoadValue(qc)
if ok {
return []byte(got), nil
}
return nil, errors.New("unknown field")
}
func (gc queryColumn) String() string {
got, _ := queryColumnMap.LoadValue(gc)
func (qc queryColumn) String() string {
got, _ := queryColumnMap.LoadValue(qc)
return got
}
func (gc *queryColumn) UnmarshalText(input []byte) error {
func (qc *queryColumn) UnmarshalText(input []byte) error {
got, ok := queryColumnMap.LoadKey(string(input))
if ok {
*gc = got
*qc = got
return nil
}
return errors.New("unknown field")
}
// queryColumnsRequiringMainTable lists query columns only present in
// the main table. Also check filter/parser.peg.
var queryColumnsRequiringMainTable = map[queryColumn]struct{}{
queryColumnSrcAddr: {},
queryColumnDstAddr: {},
queryColumnSrcPort: {},
queryColumnDstPort: {},
}
func requireMainTable(qcs []queryColumn, qf queryFilter) bool {
if qf.MainTableRequired {
return true
}
for _, qc := range qcs {
if _, ok := queryColumnsRequiringMainTable[qc]; ok {
return true
}
}
return false
}
type queryFilter struct {
Filter string
ReverseFilter string
MainTableRequired bool
}
func (gf queryFilter) String() string {
return gf.Filter
func (qf queryFilter) String() string {
return qf.Filter
}
func (gf queryFilter) MarshalText() ([]byte, error) {
return []byte(gf.Filter), nil
func (qf queryFilter) MarshalText() ([]byte, error) {
return []byte(qf.Filter), nil
}
func (gf *queryFilter) UnmarshalText(input []byte) error {
func (qf *queryFilter) UnmarshalText(input []byte) error {
if strings.TrimSpace(string(input)) == "" {
*gf = queryFilter{}
*qf = queryFilter{}
return nil
}
meta := &filter.Meta{}
@@ -58,7 +79,7 @@ func (gf *queryFilter) UnmarshalText(input []byte) error {
}
meta = &filter.Meta{ReverseDirection: true}
reverse, err := filter.Parse("", input, filter.GlobalStore("meta", meta))
*gf = queryFilter{
*qf = queryFilter{
Filter: direct.(string),
ReverseFilter: reverse.(string),
MainTableRequired: meta.MainTableRequired,
@@ -67,30 +88,30 @@ func (gf *queryFilter) UnmarshalText(input []byte) error {
}
// toSQLSelect transforms a column into an expression to use in SELECT
func (gc queryColumn) toSQLSelect() string {
func (qc queryColumn) toSQLSelect() string {
var strValue string
switch gc {
switch qc {
case queryColumnExporterAddress, queryColumnSrcAddr, queryColumnDstAddr:
strValue = fmt.Sprintf("replaceRegexpOne(IPv6NumToString(%s), '^::ffff:', '')", gc)
strValue = fmt.Sprintf("replaceRegexpOne(IPv6NumToString(%s), '^::ffff:', '')", qc)
case queryColumnSrcAS, queryColumnDstAS:
strValue = fmt.Sprintf(`concat(toString(%s), ': ', dictGetOrDefault('asns', 'name', %s, '???'))`,
gc, gc)
qc, qc)
case queryColumnEType:
strValue = fmt.Sprintf(`if(EType = %d, 'IPv4', if(EType = %d, 'IPv6', '???'))`,
helpers.ETypeIPv4, helpers.ETypeIPv6)
case queryColumnProto:
strValue = `dictGetOrDefault('protocols', 'name', Proto, '???')`
case queryColumnInIfSpeed, queryColumnOutIfSpeed, queryColumnSrcPort, queryColumnDstPort, queryColumnForwardingStatus, queryColumnInIfBoundary, queryColumnOutIfBoundary:
strValue = fmt.Sprintf("toString(%s)", gc)
strValue = fmt.Sprintf("toString(%s)", qc)
default:
strValue = gc.String()
strValue = qc.String()
}
return strValue
}
// reverseDirection reverse the direction of a column (src/dst, in/out)
func (gc queryColumn) reverseDirection() queryColumn {
value, ok := queryColumnMap.LoadKey(filter.ReverseColumnDirection(gc.String()))
func (qc queryColumn) reverseDirection() queryColumn {
value, ok := queryColumnMap.LoadKey(filter.ReverseColumnDirection(qc.String()))
if !ok {
panic("unknown reverse column")
}

View File

@@ -9,6 +9,31 @@ import (
"akvorado/common/helpers"
)
func TestRequireMainTable(t *testing.T) {
cases := []struct {
Columns []queryColumn
Filter queryFilter
Expected bool
}{
{[]queryColumn{}, queryFilter{}, false},
{[]queryColumn{queryColumnSrcAS}, queryFilter{}, false},
{[]queryColumn{queryColumnExporterAddress}, queryFilter{}, false},
{[]queryColumn{queryColumnSrcPort}, queryFilter{}, true},
{[]queryColumn{queryColumnSrcAddr}, queryFilter{}, true},
{[]queryColumn{queryColumnDstPort}, queryFilter{}, true},
{[]queryColumn{queryColumnDstAddr}, queryFilter{}, true},
{[]queryColumn{queryColumnSrcAS, queryColumnDstAddr}, queryFilter{}, true},
{[]queryColumn{queryColumnDstAddr, queryColumnSrcAS}, queryFilter{}, true},
{[]queryColumn{}, queryFilter{MainTableRequired: true}, true},
}
for idx, tc := range cases {
got := requireMainTable(tc.Columns, tc.Filter)
if got != tc.Expected {
t.Errorf("requireMainTable(%d) == %v but expected %v", idx, got, tc.Expected)
}
}
}
func TestQueryColumnSQLSelect(t *testing.T) {
cases := []struct {
Input queryColumn

View File

@@ -115,7 +115,7 @@ func (c *Component) sankeyHandlerFunc(gc *gin.Context) {
}
// Prepare and execute query
sqlQuery = c.queryFlowsTable(sqlQuery, input.Filter.MainTableRequired,
sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter),
input.Start, input.End, resolution)
gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " "))
results := []struct {