This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 89845dc916d [fix] (move-memtable) fix errors caused by checks in the 
inverted index file writer (#37621)
89845dc916d is described below

commit 89845dc916d6d8c29f8a06e6af226e0d3e6dc40b
Author: Sun Chenyang <csun5...@gmail.com>
AuthorDate: Thu Jul 11 19:43:57 2024 +0800

    [fix] (move-memtable) fix errors caused by checks in the inverted index 
file writer (#37621)
    
    ## Proposed changes
    
    
![image](https://github.com/apache/doris/assets/76934516/5fee1263-e4b1-4f09-b38c-7c49cba3aa07)
    
    1. thread A flush segment 0, flush inverted index 0
    2. thread B flush segment 1
    3. thread A add segment 0, dst be will check inverted index file size ==
    segment file size, it will fail at this point.
    
    check the inverted index file size == the segment file size during
    `close()`
---
 be/src/runtime/load_stream_writer.cpp              |  15 ++--
 .../test_move_memtable_multi_segment_index.out     |   4 +
 .../test_move_memtable_multi_segment_index.groovy  | 100 +++++++++++++++++++++
 .../suites/variant_github_events_p0/load.groovy    |   1 +
 .../variant_github_events_p0_new/load.groovy       |   1 +
 regression-test/suites/variant_p0/load.groovy      |   1 +
 6 files changed, 114 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 925229a43ce..3e66787a9bd 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -187,13 +187,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const 
SegmentStatistics& st
         if (!_is_init) {
             return Status::Corruption("add_segment failed, LoadStreamWriter is 
not inited");
         }
-        if (_inverted_file_writers.size() > 0 &&
-            _inverted_file_writers.size() != _segment_file_writers.size()) {
-            return Status::Corruption(
-                    "add_segment failed, inverted file writer size is {},"
-                    "segment file writer size is {}",
-                    _inverted_file_writers.size(), 
_segment_file_writers.size());
-        }
         DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
                         { segid = _segment_file_writers.size(); });
         RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, 
&segment_file_size));
@@ -255,7 +248,13 @@ Status LoadStreamWriter::close() {
     if (_is_canceled) {
         return Status::InternalError("flush segment failed");
     }
-
+    if (_inverted_file_writers.size() > 0 &&
+        _inverted_file_writers.size() != _segment_file_writers.size()) {
+        return Status::Corruption(
+                "LoadStreamWriter close failed, inverted file writer size is 
{},"
+                "segment file writer size is {}",
+                _inverted_file_writers.size(), _segment_file_writers.size());
+    }
     for (const auto& writer : _segment_file_writers) {
         if (writer->state() != io::FileWriter::State::CLOSED) {
             return Status::Corruption("LoadStreamWriter close failed, segment 
{} is not closed",
diff --git 
a/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out 
b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out
new file mode 100644
index 00000000000..c569d3acd09
--- /dev/null
+++ 
b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_select_count --
+67843
+
diff --git 
a/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
 
b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
new file mode 100644
index 00000000000..ac225812484
--- /dev/null
+++ 
b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_move_memtable_multi_segment_index", "nonConcurrent"){
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+    def set_be_config = { key, value ->
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+    def load_json_data = {table_name, file_name ->
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'read_json_by_line', 'true' 
+            set 'format', 'json' 
+            set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                        throw exception
+                }
+                logger.info("Stream load ${file_name} result: 
${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + 
json.NumberUnselectedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+    try {
+        set_be_config("write_buffer_size", "2097152")
+        def table_name = "github_events"
+        sql """DROP TABLE IF EXISTS ${table_name}"""
+        table_name = "github_events"
+        sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                k bigint,
+                v variant,
+                INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+            )
+            DUPLICATE KEY(`k`)
+            DISTRIBUTED BY HASH(k) BUCKETS 1 
+            properties("replication_num" = "1", "disable_auto_compaction" = 
"true");
+        """
+
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-0.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-1.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-2.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2015-01-01-3.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-16.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-10.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-22.json'}""")
+        load_json_data.call(table_name, """${getS3Url() + 
'/regression/gharchive.m/2022-11-07-23.json'}""")
+
+        sql """DROP TABLE IF EXISTS github_events_2"""
+        sql """
+            CREATE TABLE IF NOT EXISTS `github_events_2` (
+            `k` BIGINT NULL,
+            `v` text NULL,
+            INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = 
"english") COMMENT ''
+            ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY 
HASH(`k`) BUCKETS 1 PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+            );
+        """
+
+        sql """
+            insert into github_events_2 select 1, cast(v["repo"]["name"] as 
string) FROM github_events;
+        """
+        qt_sql_select_count """ select count(*) from github_events_2; """
+    } finally {
+        set_be_config("write_buffer_size", "209715200")
+    }
+    
+}
diff --git a/regression-test/suites/variant_github_events_p0/load.groovy 
b/regression-test/suites/variant_github_events_p0/load.groovy
index cec4fb69fd4..8159a5752e5 100644
--- a/regression-test/suites/variant_github_events_p0/load.groovy
+++ b/regression-test/suites/variant_github_events_p0/load.groovy
@@ -127,6 +127,7 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
             set 'read_json_by_line', 'true' 
             set 'format', 'json' 
             set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
             file file_name // import json file
             time 10000 // limit inflight 10s
 
diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy 
b/regression-test/suites/variant_github_events_p0_new/load.groovy
index c063ebecf26..4d8d304aeb1 100644
--- a/regression-test/suites/variant_github_events_p0_new/load.groovy
+++ b/regression-test/suites/variant_github_events_p0_new/load.groovy
@@ -34,6 +34,7 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
             set 'read_json_by_line', 'true' 
             set 'format', 'json' 
             set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
             file file_name // import json file
             time 10000 // limit inflight 10s
 
diff --git a/regression-test/suites/variant_p0/load.groovy 
b/regression-test/suites/variant_p0/load.groovy
index cbd6bc1178c..c84620a3d91 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -26,6 +26,7 @@ suite("regression_test_variant", "nonConcurrent"){
             set 'read_json_by_line', 'true' 
             set 'format', 'json' 
             set 'max_filter_ratio', '0.1'
+            set 'memtable_on_sink_node', 'true'
             file file_name // import json file
             time 10000 // limit inflight 10s
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to