This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 28f7573f589 [fix](inverted index) fix rowset data size when enable 
index compaction (#45350)
28f7573f589 is described below

commit 28f7573f589ca945b356665337961977d529bb5f
Author: Sun Chenyang <suncheny...@selectdb.com>
AuthorDate: Tue Dec 17 10:57:45 2024 +0800

    [fix](inverted index) fix rowset data size when enable index compaction 
(#45350)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    - After performing index compaction, the index file size is not included
    in the rowset's data size.
    - A similar PR #37232 has already fixed this bug in the master branch.
---
 be/src/olap/compaction.cpp                         |  59 ++++++++-
 .../test_index_compaction_rowset_size.groovy       | 144 +++++++++++++++++++++
 2 files changed, 202 insertions(+), 1 deletion(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index d7f28436800..b4ace1f5170 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -524,6 +524,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
             auto& fs = _output_rowset->rowset_meta()->fs();
             auto& tablet_path = _tablet->tablet_path();
 
+            // After doing index compaction, need to add this size to 
rowset->total_size
+            int64_t compacted_index_file_size = 0;
+
             // we choose the first destination segment name as the temporary 
index writer path
             // Used to distinguish between different index compaction
             auto index_writer_path = tablet_path + "/" + dest_index_files[0];
@@ -536,7 +539,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                     ctx.skip_inverted_index.cbegin(), 
ctx.skip_inverted_index.cend(),
                     [&src_segment_num, &dest_segment_num, &index_writer_path, 
&src_index_files,
                      &dest_index_files, &fs, &tablet_path, &trans_vec, 
&dest_segment_num_rows,
-                     &status, this](int32_t column_uniq_id) {
+                     &status, &compacted_index_file_size, this](int32_t 
column_uniq_id) {
                         auto error_handler = [this](int64_t index_id, int64_t 
column_uniq_id) {
                             LOG(WARNING) << "failed to do index compaction"
                                          << ". tablet=" << _tablet->tablet_id()
@@ -584,6 +587,25 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                                 error_handler(index_id, column_uniq_id);
                                 status = 
Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
                                         st.msg());
+                            } else {
+                                for (int i = 0; i < dest_segment_num; ++i) {
+                                    // format: rowsetId_segmentId_columnId
+                                    auto seg_path =
+                                            
std::static_pointer_cast<BetaRowset>(_output_rowset)
+                                                    ->segment_file_path(i);
+                                    std::string index_path =
+                                            
InvertedIndexDescriptor::get_index_file_name(seg_path,
+                                                                               
          index_id);
+                                    int64_t current_size = 0;
+                                    st = fs->file_size(index_path, 
&current_size);
+                                    if (!st.ok()) {
+                                        error_handler(index_id, 
column_uniq_id);
+                                        status = Status::Error<
+                                                
ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
+                                                st.msg());
+                                    }
+                                    compacted_index_file_size += current_size;
+                                }
                             }
                         } catch (CLuceneError& e) {
                             error_handler(index_id, column_uniq_id);
@@ -597,6 +619,41 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                 return status;
             }
 
+            // index compaction should update total disk size and index disk 
size=
+            _output_rowset->rowset_meta()->set_data_disk_size(
+                    _output_rowset->rowset_meta()->data_disk_size() + 
compacted_index_file_size);
+            _output_rowset->rowset_meta()->set_total_disk_size(
+                    _output_rowset->rowset_meta()->total_disk_size() + 
compacted_index_file_size);
+            
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size()
 +
+                                                               
compacted_index_file_size);
+
+            DBUG_EXECUTE_IF("check_after_compaction_file_size", {
+                int64_t total_file_size = 0;
+                for (int i = 0; i < dest_segment_num; ++i) {
+                    auto seg_path = 
std::static_pointer_cast<BetaRowset>(_output_rowset)
+                                            ->segment_file_path(i);
+                    int64_t current_size = 0;
+                    RETURN_IF_ERROR(fs->file_size(seg_path, &current_size));
+                    total_file_size += current_size;
+                    for (auto& column : _cur_tablet_schema->columns()) {
+                        const TabletIndex* index_meta =
+                                
_cur_tablet_schema->get_inverted_index(column.unique_id());
+                        if (index_meta) {
+                            std::string index_path = 
InvertedIndexDescriptor::get_index_file_name(
+                                    seg_path, index_meta->index_id());
+                            RETURN_IF_ERROR(fs->file_size(index_path, 
&current_size));
+                            total_file_size += current_size;
+                        }
+                    }
+                }
+                if (total_file_size != 
_output_rowset->rowset_meta()->data_disk_size()) {
+                    Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
+                            "total file size {} is not equal rowset meta size 
{}", total_file_size,
+                            _output_rowset->rowset_meta()->data_disk_size());
+                }
+                LOG(INFO) << "succeed to check index compaction file size";
+            })
+
             LOG(INFO) << "succeed to do index compaction"
                       << ". tablet=" << _tablet->full_name()
                       << ", input row number=" << _input_row_num
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy
new file mode 100644
index 00000000000..9dd3b1fa09a
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_index_compaction_rowset_size", "p0, nonConcurrent") {
+
+    def show_table_name = "test_index_compaction_rowset_size"
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+    def set_be_config = { key, value ->
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+    set_be_config.call("inverted_index_compaction_enable", "true")
+
+    sql "DROP TABLE IF EXISTS ${show_table_name}"
+    sql """ 
+        CREATE TABLE ${show_table_name} (
+            `@timestamp` int(11) NULL,
+            `clientip` varchar(20) NULL,
+            `request` varchar(500) NULL,
+            `status` int NULL,
+            `size` int NULL,
+            INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+            INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"unicode") COMMENT '',
+            INDEX size_idx (`size`) USING INVERTED COMMENT '',
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`@timestamp`, `clientip`)
+        DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "compaction_policy" = "time_series",
+            "time_series_compaction_file_count_threshold" = "20",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    def compaction = {
+
+        def tablets = sql_return_maparray """ show tablets from 
${show_table_name}; """
+
+        for (def tablet in tablets) {
+            int beforeSegmentCount = 0
+            String tablet_id = tablet.TabletId
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                beforeSegmentCount += Integer.parseInt(rowset.split(" ")[1])
+            }
+            assertEquals(beforeSegmentCount, 12)
+        }
+
+        // trigger compactions for all tablets in ${tableName}
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            assertEquals("success", compactJson.status.toLowerCase())
+        }
+
+        // wait for all compactions done
+        for (def tablet in tablets) {
+            Awaitility.await().atMost(10, TimeUnit.MINUTES).untilAsserted(() 
-> {
+                Thread.sleep(5000)
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("compaction task for this tablet is not running", 
compactionStatus.msg.toLowerCase())
+            });
+        }
+
+        
+        for (def tablet in tablets) {
+            int afterSegmentCount = 0
+            String tablet_id = tablet.TabletId
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                logger.info("rowset is: " + rowset)
+                afterSegmentCount += Integer.parseInt(rowset.split(" ")[1])
+            }
+            assertEquals(afterSegmentCount, 1)
+        }
+    }
+
+   
+
+    // 1. load data
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+    sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love 
apple", 100, 200); """
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("check_after_compaction_file_size")
+        // 2. compaction
+        compaction.call()
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("check_after_compaction_file_size")
+    }
+    
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to