diff --git a/common/schema/insert_test.go b/common/schema/insert_test.go index 583f1a37..2d5147dd 100644 --- a/common/schema/insert_test.go +++ b/common/schema/insert_test.go @@ -67,6 +67,7 @@ func TestInsertMemory(t *testing.T) { conn, err := ch.Dial(ctx, ch.Options{ Address: server, + Database: "test", DialTimeout: 100 * time.Millisecond, Settings: []ch.Setting{ {Key: "allow_suspicious_low_cardinality_types", Value: "1"}, @@ -78,7 +79,7 @@ func TestInsertMemory(t *testing.T) { // Create the table q := fmt.Sprintf( - `CREATE OR REPLACE TABLE test_table_insert (%s) ENGINE = Memory`, + `CREATE OR REPLACE TABLE test_schema_insert (%s) ENGINE = Memory`, c.ClickHouseCreateTable(schema.ClickHouseSkipAliasedColumns, schema.ClickHouseSkipGeneratedColumns), ) t.Logf("Query: %s", q) @@ -91,7 +92,7 @@ func TestInsertMemory(t *testing.T) { // Insert input := bf.ClickHouseProtoInput() if err := conn.Do(ctx, ch.Query{ - Body: input.Into("test_table_insert"), + Body: input.Into("test_schema_insert"), Input: input, OnInput: func(ctx context.Context) error { bf.Clear() @@ -105,14 +106,17 @@ func TestInsertMemory(t *testing.T) { // Check the result (with the full-featured client) { conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{server}, + Addr: []string{server}, + Auth: clickhouse.Auth{ + Database: "test", + }, DialTimeout: 100 * time.Millisecond, }) if err != nil { t.Fatalf("clickhouse.Open() error:\n%+v", err) } // Use formatRow to get JSON representation - rows, err := conn.Query(ctx, "SELECT formatRow('JSONEachRow', *) FROM test_table_insert ORDER BY TimeReceived") + rows, err := conn.Query(ctx, "SELECT formatRow('JSONEachRow', *) FROM test_schema_insert ORDER BY TimeReceived") if err != nil { t.Fatalf("clickhouse.Query() error:\n%+v", err) } diff --git a/docker/clickhouse/test-db.sql b/docker/clickhouse/test-db.sql new file mode 100644 index 00000000..e68c2efe --- /dev/null +++ b/docker/clickhouse/test-db.sql @@ -0,0 +1 @@ +CREATE DATABASE IF NOT EXISTS test; diff --git a/docker/docker-compose-dev.yml b/docker/docker-compose-dev.yml index 1055bce8..a3005f1d 100644 --- a/docker/docker-compose-dev.yml +++ b/docker/docker-compose-dev.yml @@ -124,11 +124,13 @@ services: - zookeeper environment: - CLICKHOUSE_SKIP_USER_SETUP=1 + - CLICKHOUSE_ALWAYS_RUN_INITDB_SCRIPTS=1 cap_add: - SYS_NICE volumes: - ./clickhouse/cluster.xml:/etc/clickhouse-server/config.d/cluster.xml - ./clickhouse/cluster-1.xml:/etc/clickhouse-server/config.d/cluster-1.xml + - ./clickhouse/test-db.sql:/docker-entrypoint-initdb.d/test-db.sql ports: - 127.0.0.1:8123:8123/tcp - 127.0.0.1:9000:9000/tcp diff --git a/orchestrator/clickhouse/migrations_test.go b/orchestrator/clickhouse/migrations_test.go index 3ac238cd..572f53ec 100644 --- a/orchestrator/clickhouse/migrations_test.go +++ b/orchestrator/clickhouse/migrations_test.go @@ -118,9 +118,6 @@ func isOldTable(schema *schema.Component, table string) bool { if strings.Contains(table, schema.ClickHouseHash()) { return false } - if strings.HasPrefix(table, "test_") { - return true - } oldSuffixes := []string{ "_raw", "_raw_consumer", diff --git a/outlet/clickhouse/example_test.go b/outlet/clickhouse/example_test.go index 437034af..1b83d6ec 100644 --- a/outlet/clickhouse/example_test.go +++ b/outlet/clickhouse/example_test.go @@ -22,6 +22,7 @@ func TestInsertMemory(t *testing.T) { conn, err := ch.Dial(ctx, ch.Options{ Address: server, + Database: "test", DialTimeout: 100 * time.Millisecond, }) if err != nil { @@ -29,7 +30,7 @@ func TestInsertMemory(t *testing.T) { } if err := conn.Do(ctx, ch.Query{ - Body: `CREATE OR REPLACE TABLE test_table_insert + Body: `CREATE OR REPLACE TABLE test_example_insert ( ts DateTime64(9), severity_text Enum8('INFO'=1, 'DEBUG'=2), @@ -81,7 +82,7 @@ func TestInsertMemory(t *testing.T) { // Insert single data block. if err := conn.Do(ctx, ch.Query{ - Body: "INSERT INTO test_table_insert VALUES", + Body: "INSERT INTO test_example_insert VALUES", Input: input, }); err != nil { t.Fatalf("Do() error:\n%+v", err) @@ -92,7 +93,7 @@ func TestInsertMemory(t *testing.T) { // Stream data to ClickHouse server in multiple data blocks. var blocks int if err := conn.Do(ctx, ch.Query{ - Body: input.Into("test_table_insert"), // helper that generates INSERT INTO query with all columns + Body: input.Into("test_example_insert"), // helper that generates INSERT INTO query with all columns Input: input, // OnInput is called to prepare Input data before encoding and sending diff --git a/outlet/clickhouse/functional_test.go b/outlet/clickhouse/functional_test.go index 8cd722e4..4fbb3c11 100644 --- a/outlet/clickhouse/functional_test.go +++ b/outlet/clickhouse/functional_test.go @@ -33,6 +33,7 @@ func TestInsert(t *testing.T) { // Create components dbConf := clickhousedb.DefaultConfiguration() dbConf.Servers = []string{server} + dbConf.Database = "test" dbConf.DialTimeout = 100 * time.Millisecond chdb, err := clickhousedb.New(r, dbConf, clickhousedb.Dependencies{ Daemon: daemon.NewMock(t),