This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8f6f4cf0eb1 [Pick](Variant) pick #33734 #33766  #33707 to branch-2.1 
(#33848)
8f6f4cf0eb1 is described below

commit 8f6f4cf0eb137621e04a22cea9b69bda9db18022
Author: lihangyu <15605149...@163.com>
AuthorDate: Thu Apr 18 19:42:44 2024 +0800

    [Pick](Variant) pick #33734 #33766  #33707 to branch-2.1 (#33848)
    
    * [Fix](Variant Type) forbit distribution info contains variant columns 
(#33707)
    
    * [Fix](Variant) VariantRootColumnIterator::read_by_rowids with wrong null 
map size (#33734)
    
    insert_range_from should start from `size` with `count` elements for null 
map
    
    * [Fix](Variant) check column index validation for extracted columns 
(#33766)
---
 .../olap/rowset/segment_v2/inverted_index_writer.h |  22 ++++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   6 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   5 +-
 be/src/olap/task/index_builder.cpp                 |   3 +
 .../data/variant_p0/with_index/var_index.out       |  12 ++
 .../suites/variant_github_events_p0/load.groovy    | 137 ++++++++++++++++++++-
 .../suites/variant_p0/with_index/var_index.groovy  |   8 +-
 7 files changed, 184 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
index 5ed34852c94..77873905af1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
@@ -24,13 +24,16 @@
 #include <atomic>
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "common/config.h"
 #include "common/status.h"
 #include "gutil/strings/split.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
+#include "olap/olap_common.h"
 #include "olap/options.h"
+#include "olap/tablet_schema.h"
 
 namespace doris {
 class CollectionValue;
@@ -70,6 +73,25 @@ public:
 
     virtual void close_on_error() = 0;
 
+    // check if the column is valid for inverted index, some columns
+    // are generated from variant, but not all of them are supported
+    static bool check_column_valid(const TabletColumn& column) {
+        // bellow types are not supported in inverted index for extracted 
columns
+        static std::set<FieldType> invalid_types = {
+                FieldType::OLAP_FIELD_TYPE_DOUBLE,
+                FieldType::OLAP_FIELD_TYPE_JSONB,
+                FieldType::OLAP_FIELD_TYPE_ARRAY,
+                FieldType::OLAP_FIELD_TYPE_FLOAT,
+        };
+        if (column.is_extracted_column() && 
(invalid_types.contains(column.type()))) {
+            return false;
+        }
+        if (column.is_variant_type()) {
+            return false;
+        }
+        return true;
+    }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
 };
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index dc85408accd..284e0e0eaaa 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -42,6 +42,7 @@
 #include "olap/rowset/rowset_writer_context.h"    // RowsetWriterContext
 #include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
 #include "olap/rowset/segment_v2/inverted_index_file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_writer.h"
 #include "olap/rowset/segment_v2/page_io.h"
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "olap/segment_loader.h"
@@ -224,9 +225,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& 
col_ids, bool has_key) {
         }
         // indexes for this column
         opts.indexes = 
std::move(_tablet_schema->get_indexes_for_column(column));
-        if (column.is_variant_type() || (column.is_extracted_column() && 
column.is_jsonb_type()) ||
-            (column.is_extracted_column() && column.is_array_type())) {
-            // variant and jsonb type skip write index
+        if (!InvertedIndexColumnWriter::check_column_valid(column)) {
+            // skip inverted index if invalid
             opts.indexes.clear();
             opts.need_zone_map = false;
             opts.need_bloom_filter = false;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index d94bb0ce3fe..2a4f924b98a 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -176,9 +176,8 @@ Status 
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
     }
     // indexes for this column
     opts.indexes = _tablet_schema->get_indexes_for_column(column);
-    if (column.is_variant_type() || (column.is_extracted_column() && 
column.is_jsonb_type()) ||
-        (column.is_extracted_column() && column.is_array_type())) {
-        // variant and jsonb type skip write index
+    if (!InvertedIndexColumnWriter::check_column_valid(column)) {
+        // skip inverted index if invalid
         opts.indexes.clear();
         opts.need_zone_map = false;
         opts.need_bloom_filter = false;
diff --git a/be/src/olap/task/index_builder.cpp 
b/be/src/olap/task/index_builder.cpp
index 0e6abc2c1b1..09f745833d1 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -346,6 +346,9 @@ Status 
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
                     continue;
                 }
                 auto column = output_rowset_schema->column(column_idx);
+                if (!InvertedIndexColumnWriter::check_column_valid(column)) {
+                    continue;
+                }
                 
DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id, ""));
                 _olap_data_convertor->add_column_data_convertor(column);
                 return_columns.emplace_back(column_idx);
diff --git a/regression-test/data/variant_p0/with_index/var_index.out 
b/regression-test/data/variant_p0/with_index/var_index.out
index d8b7417852b..634b5125e84 100644
--- a/regression-test/data/variant_p0/with_index/var_index.out
+++ b/regression-test/data/variant_p0/with_index/var_index.out
@@ -8,3 +8,15 @@
 2      {"a":18811,"b":"hello world","c":1181111}
 4      {"a":1234,"b":"hello xxx world","c":8181111}
 
+-- !sql --
+1      {"a":123,"b":"xxxyyy","c":111999111}
+2      {"a":18811,"b":"hello world","c":1181111}
+3      {"a":18811,"b":"hello wworld","c":11111}
+4      {"a":1234,"b":"hello xxx world","c":8181111}
+5      {"a":123456789,"b":123456,"c":8181111}
+6      {"timestamp":1713283200.060359}
+7      {"timestamp":17.0}
+8      {"timestamp":[123]}
+9      {"timestamp":17.0}
+10     {"timestamp":"17.0"}
+
diff --git a/regression-test/suites/variant_github_events_p0/load.groovy 
b/regression-test/suites/variant_github_events_p0/load.groovy
index e05131d9153..befd1aa6103 100644
--- a/regression-test/suites/variant_github_events_p0/load.groovy
+++ b/regression-test/suites/variant_github_events_p0/load.groovy
@@ -16,6 +16,97 @@
 // under the License.
 
 suite("regression_test_variant_github_events_p0", "nonConcurrent"){
+    // prepare test table
+    def timeout = 300000
+    def delta_time = 1000
+    def alter_res = "null"
+    def useTime = 0
+    
+    def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = 
"${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
+            alter_res = alter_res.toString()
+            if(alter_res.contains("FINISHED")) {
+                sleep(10000) // wait change table state to normal
+                logger.info(table_name + " latest alter job finished, detail: 
" + alter_res)
+                break
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish 
timeout")
+    }
+
+    def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}";"""
+            def expected_finished_num = alter_res.size();
+            def finished_num = 0;
+            for (int i = 0; i < expected_finished_num; i++) {
+                logger.info(table_name + " build index job state: " + 
alter_res[i][7] + i)
+                if (alter_res[i][7] == "FINISHED") {
+                    ++finished_num;
+                }
+            }
+            if (finished_num == expected_finished_num) {
+                sleep(10000) // wait change table state to normal
+                logger.info(table_name + " all build index jobs finished, 
detail: " + alter_res)
+                break
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, 
"wait_for_latest_build_index_on_partition_finish timeout")
+    }
+
+    def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            if (alter_res.size() == 0) {
+                logger.info(table_name + " last index job finished")
+                return "SKIPPED"
+            }
+            if (alter_res.size() > 0) {
+                def last_job_state = alter_res[alter_res.size()-1][7];
+                if (last_job_state == "FINISHED" || last_job_state == 
"CANCELLED") {
+                    sleep(10000) // wait change table state to normal
+                    logger.info(table_name + " last index job finished, state: 
" + last_job_state + ", detail: " + alter_res)
+                    return last_job_state;
+                }
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        logger.info("wait_for_last_build_index_on_table_finish debug: " + 
alter_res)
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_finish timeout")
+        return "wait_timeout"
+    }
+
+    def wait_for_last_build_index_on_table_running = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            if (alter_res.size() == 0) {
+                logger.info(table_name + " last index job finished")
+                return "SKIPPED"
+            }
+            if (alter_res.size() > 0) {
+                def last_job_state = alter_res[alter_res.size()-1][7];
+                if (last_job_state == "RUNNING") {
+                    logger.info(table_name + " last index job running, state: 
" + last_job_state + ", detail: " + alter_res)
+                    return last_job_state;
+                }
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        logger.info("wait_for_last_build_index_on_table_running debug: " + 
alter_res)
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_running timeout")
+        return "wait_timeout"
+    }    
+
+
     def backendId_to_backendIP = [:]
     def backendId_to_backendHttpPort = [:]
     getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
@@ -60,8 +151,8 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
     sql """
         CREATE TABLE IF NOT EXISTS ${table_name} (
             k bigint,
-            v variant not null,
-            INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") 
COMMENT ''
+            v variant not null
+            -- INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
         )
         DUPLICATE KEY(`k`)
         DISTRIBUTED BY HASH(k) BUCKETS 4 
@@ -73,11 +164,53 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-1.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-2.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-3.json'}""")
+
+    // build inverted index at middle of loading the data
+    // ADD INDEX
+    sql """ ALTER TABLE github_events ADD INDEX idx_var (`v`) USING INVERTED 
PROPERTIES("parser" = "chinese", "parser_mode" = "fine_grained", 
"support_phrase" = "true") """
+    wait_for_latest_op_on_table_finish("github_events", timeout)
+
     // 2022
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-16.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-10.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-22.json'}""")
     load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-23.json'}""")
+
+    // BUILD INDEX and expect state is FINISHED 
+    sql """ BUILD INDEX idx_var ON  github_events"""
+    state = wait_for_last_build_index_on_table_finish("github_events", timeout)
+    assertEquals("FINISHED", state)
+
+    // add bloom filter at the end of loading data 
+
+    def tablets = sql_return_maparray """ show tablets from github_events; """
+    // trigger compactions for all tablets in github_events
+    for (def tablet in tablets) {
+        String tablet_id = tablet.TabletId
+        backend_id = tablet.BackendId
+        (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+        logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
+        assertEquals(code, 0)
+        def compactJson = parseJson(out.trim())
+    }
+
+    // wait for all compactions done
+    for (def tablet in tablets) {
+        boolean running = true
+        do {
+            Thread.sleep(1000)
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Get compaction status: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+    }
+
+    
     // TODO fix compaction issue, this case could be stable
     qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int)  
from github_events where cast(v["repo"]["name"] as string) = 
'xpressengine/xe-core' order by 1;"""
     qt_sql """select * from github_events where  cast(v["repo"]["name"] as 
string) = 'xpressengine/xe-core' order by 1 limit 10"""
diff --git a/regression-test/suites/variant_p0/with_index/var_index.groovy 
b/regression-test/suites/variant_p0/with_index/var_index.groovy
index bea90f7403b..8c7afaa4a26 100644
--- a/regression-test/suites/variant_p0/with_index/var_index.groovy
+++ b/regression-test/suites/variant_p0/with_index/var_index.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("regression_test_variant_var_index", "variant_type"){
+suite("regression_test_variant_var_index", "p0"){
     def table_name = "var_index"
     sql "DROP TABLE IF EXISTS var_index"
     sql """
@@ -36,4 +36,10 @@ suite("regression_test_variant_var_index", "variant_type"){
     qt_sql """select * from var_index where cast(v["a"] as smallint) > 123 and 
cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by 
k"""
     sql """insert into var_index values(5, '{"a" : 123456789, "b" : 123456, 
"c" : 8181111}')"""
     qt_sql """select * from var_index where cast(v["a"] as int) > 123 and 
cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by 
k"""
+    // insert double/float/array/json
+    sql """insert into var_index values(6, '{"timestamp": 
1713283200.060359}')"""
+    sql """insert into var_index values(7, '{"timestamp": 17.0}')"""
+    sql """insert into var_index values(8, '{"timestamp": [123]}')"""
+    sql """insert into var_index values(9, '{"timestamp": 17.0}'),(10, 
'{"timestamp": "17.0"}')"""
+    qt_sql "select * from var_index order by k limit 10"
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to