This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8f6f4cf0eb1 [Pick](Variant) pick #33734 #33766 #33707 to branch-2.1 (#33848) 8f6f4cf0eb1 is described below commit 8f6f4cf0eb137621e04a22cea9b69bda9db18022 Author: lihangyu <15605149...@163.com> AuthorDate: Thu Apr 18 19:42:44 2024 +0800 [Pick](Variant) pick #33734 #33766 #33707 to branch-2.1 (#33848) * [Fix](Variant Type) forbit distribution info contains variant columns (#33707) * [Fix](Variant) VariantRootColumnIterator::read_by_rowids with wrong null map size (#33734) insert_range_from should start from `size` with `count` elements for null map * [Fix](Variant) check column index validation for extracted columns (#33766) --- .../olap/rowset/segment_v2/inverted_index_writer.h | 22 ++++ be/src/olap/rowset/segment_v2/segment_writer.cpp | 6 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 5 +- be/src/olap/task/index_builder.cpp | 3 + .../data/variant_p0/with_index/var_index.out | 12 ++ .../suites/variant_github_events_p0/load.groovy | 137 ++++++++++++++++++++- .../suites/variant_p0/with_index/var_index.groovy | 8 +- 7 files changed, 184 insertions(+), 9 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_writer.h index 5ed34852c94..77873905af1 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h @@ -24,13 +24,16 @@ #include <atomic> #include <memory> #include <string> +#include <vector> #include "common/config.h" #include "common/status.h" #include "gutil/strings/split.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" +#include "olap/olap_common.h" #include "olap/options.h" +#include "olap/tablet_schema.h" namespace doris { class CollectionValue; @@ -70,6 +73,25 @@ public: virtual void close_on_error() = 0; + // check if the column is valid for inverted index, some columns + // are generated from variant, but not all of them are supported + static bool check_column_valid(const TabletColumn& column) { + // bellow types are not supported in inverted index for extracted columns + static std::set<FieldType> invalid_types = { + FieldType::OLAP_FIELD_TYPE_DOUBLE, + FieldType::OLAP_FIELD_TYPE_JSONB, + FieldType::OLAP_FIELD_TYPE_ARRAY, + FieldType::OLAP_FIELD_TYPE_FLOAT, + }; + if (column.is_extracted_column() && (invalid_types.contains(column.type()))) { + return false; + } + if (column.is_variant_type()) { + return false; + } + return true; + } + private: DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter); }; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index dc85408accd..284e0e0eaaa 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -42,6 +42,7 @@ #include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter #include "olap/rowset/segment_v2/inverted_index_file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_writer.h" #include "olap/rowset/segment_v2/page_io.h" #include "olap/rowset/segment_v2/page_pointer.h" #include "olap/segment_loader.h" @@ -224,9 +225,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) { } // indexes for this column opts.indexes = std::move(_tablet_schema->get_indexes_for_column(column)); - if (column.is_variant_type() || (column.is_extracted_column() && column.is_jsonb_type()) || - (column.is_extracted_column() && column.is_array_type())) { - // variant and jsonb type skip write index + if (!InvertedIndexColumnWriter::check_column_valid(column)) { + // skip inverted index if invalid opts.indexes.clear(); opts.need_zone_map = false; opts.need_bloom_filter = false; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index d94bb0ce3fe..2a4f924b98a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -176,9 +176,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo } // indexes for this column opts.indexes = _tablet_schema->get_indexes_for_column(column); - if (column.is_variant_type() || (column.is_extracted_column() && column.is_jsonb_type()) || - (column.is_extracted_column() && column.is_array_type())) { - // variant and jsonb type skip write index + if (!InvertedIndexColumnWriter::check_column_valid(column)) { + // skip inverted index if invalid opts.indexes.clear(); opts.need_zone_map = false; opts.need_bloom_filter = false; diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 0e6abc2c1b1..09f745833d1 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -346,6 +346,9 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta continue; } auto column = output_rowset_schema->column(column_idx); + if (!InvertedIndexColumnWriter::check_column_valid(column)) { + continue; + } DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id, "")); _olap_data_convertor->add_column_data_convertor(column); return_columns.emplace_back(column_idx); diff --git a/regression-test/data/variant_p0/with_index/var_index.out b/regression-test/data/variant_p0/with_index/var_index.out index d8b7417852b..634b5125e84 100644 --- a/regression-test/data/variant_p0/with_index/var_index.out +++ b/regression-test/data/variant_p0/with_index/var_index.out @@ -8,3 +8,15 @@ 2 {"a":18811,"b":"hello world","c":1181111} 4 {"a":1234,"b":"hello xxx world","c":8181111} +-- !sql -- +1 {"a":123,"b":"xxxyyy","c":111999111} +2 {"a":18811,"b":"hello world","c":1181111} +3 {"a":18811,"b":"hello wworld","c":11111} +4 {"a":1234,"b":"hello xxx world","c":8181111} +5 {"a":123456789,"b":123456,"c":8181111} +6 {"timestamp":1713283200.060359} +7 {"timestamp":17.0} +8 {"timestamp":[123]} +9 {"timestamp":17.0} +10 {"timestamp":"17.0"} + diff --git a/regression-test/suites/variant_github_events_p0/load.groovy b/regression-test/suites/variant_github_events_p0/load.groovy index e05131d9153..befd1aa6103 100644 --- a/regression-test/suites/variant_github_events_p0/load.groovy +++ b/regression-test/suites/variant_github_events_p0/load.groovy @@ -16,6 +16,97 @@ // under the License. suite("regression_test_variant_github_events_p0", "nonConcurrent"){ + // prepare test table + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_latest_op_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" + alter_res = alter_res.toString() + if(alter_res.contains("FINISHED")) { + sleep(10000) // wait change table state to normal + logger.info(table_name + " latest alter job finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout") + } + + def wait_for_build_index_on_partition_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";""" + def expected_finished_num = alter_res.size(); + def finished_num = 0; + for (int i = 0; i < expected_finished_num; i++) { + logger.info(table_name + " build index job state: " + alter_res[i][7] + i) + if (alter_res[i][7] == "FINISHED") { + ++finished_num; + } + } + if (finished_num == expected_finished_num) { + sleep(10000) // wait change table state to normal + logger.info(table_name + " all build index jobs finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout") + } + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def wait_for_last_build_index_on_table_running = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "RUNNING") { + logger.info(table_name + " last index job running, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_running debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_running timeout") + return "wait_timeout" + } + + def backendId_to_backendIP = [:] def backendId_to_backendHttpPort = [:] getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); @@ -60,8 +151,8 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, - v variant not null, - INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + v variant not null + -- INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' ) DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 @@ -73,11 +164,53 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""") + + // build inverted index at middle of loading the data + // ADD INDEX + sql """ ALTER TABLE github_events ADD INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "chinese", "parser_mode" = "fine_grained", "support_phrase" = "true") """ + wait_for_latest_op_on_table_finish("github_events", timeout) + // 2022 load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-10.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-22.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-23.json'}""") + + // BUILD INDEX and expect state is FINISHED + sql """ BUILD INDEX idx_var ON github_events""" + state = wait_for_last_build_index_on_table_finish("github_events", timeout) + assertEquals("FINISHED", state) + + // add bloom filter at the end of loading data + + def tablets = sql_return_maparray """ show tablets from github_events; """ + // trigger compactions for all tablets in github_events + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // TODO fix compaction issue, this case could be stable qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;""" qt_sql """select * from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1 limit 10""" diff --git a/regression-test/suites/variant_p0/with_index/var_index.groovy b/regression-test/suites/variant_p0/with_index/var_index.groovy index bea90f7403b..8c7afaa4a26 100644 --- a/regression-test/suites/variant_p0/with_index/var_index.groovy +++ b/regression-test/suites/variant_p0/with_index/var_index.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("regression_test_variant_var_index", "variant_type"){ +suite("regression_test_variant_var_index", "p0"){ def table_name = "var_index" sql "DROP TABLE IF EXISTS var_index" sql """ @@ -36,4 +36,10 @@ suite("regression_test_variant_var_index", "variant_type"){ qt_sql """select * from var_index where cast(v["a"] as smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by k""" sql """insert into var_index values(5, '{"a" : 123456789, "b" : 123456, "c" : 8181111}')""" qt_sql """select * from var_index where cast(v["a"] as int) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by k""" + // insert double/float/array/json + sql """insert into var_index values(6, '{"timestamp": 1713283200.060359}')""" + sql """insert into var_index values(7, '{"timestamp": 17.0}')""" + sql """insert into var_index values(8, '{"timestamp": [123]}')""" + sql """insert into var_index values(9, '{"timestamp": 17.0}'),(10, '{"timestamp": "17.0"}')""" + qt_sql "select * from var_index order by k limit 10" } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org