This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new eaf718711b0 branch-3.0-pick: [Fix](merge-on-write) should re-calculate 
delete bitmaps between segments if BE restart before publish (#48775) (#48917)
eaf718711b0 is described below

commit eaf718711b090a8ae146e2eebb13a160aabdb004
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Wed Mar 12 10:09:22 2025 +0800

    branch-3.0-pick: [Fix](merge-on-write) should re-calculate delete bitmaps 
between segments if BE restart before publish (#48775) (#48917)
    
    pick https://github.com/apache/doris/pull/48775
---
 be/src/olap/base_tablet.cpp                        |   3 +
 be/src/olap/tablet_meta.cpp                        |   4 +
 be/src/olap/tablet_meta.h                          |   2 +
 be/src/olap/txn_manager.cpp                        |  11 ++
 ...est_local_multi_segments_re_calc_in_publish.out | Bin 0 -> 202 bytes
 ..._local_multi_segments_re_calc_in_publish.groovy | 174 +++++++++++++++++++++
 6 files changed, 194 insertions(+)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 5ccac388bf3..dcbe400734c 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -383,6 +383,9 @@ Status BaseTablet::calc_delete_bitmap_between_segments(
 
     RETURN_IF_ERROR(calculator.calculate_all(delete_bitmap));
 
+    delete_bitmap->add(
+            {rowset_id, DeleteBitmap::INVALID_SEGMENT_ID, 
DeleteBitmap::TEMP_VERSION_COMMON},
+            DeleteBitmap::ROWSET_SENTINEL_MARK);
     LOG(INFO) << fmt::format(
             "construct delete bitmap between segments, "
             "tablet: {}, rowset: {}, number of segments: {}, bitmap size: {}, 
cost {} (us)",
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 031c07e4b4e..50a62899a9d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -1262,6 +1262,10 @@ uint64_t DeleteBitmap::get_delete_bitmap_count() {
     return delete_bitmap.size();
 }
 
+bool DeleteBitmap::has_calculated_for_multi_segments(const RowsetId& 
rowset_id) const {
+    return contains({rowset_id, INVALID_SEGMENT_ID, TEMP_VERSION_COMMON}, 
ROWSET_SENTINEL_MARK);
+}
+
 // We cannot just copy the underlying memory to construct a string
 // due to equivalent objects may have different padding bytes.
 // Reading padding bytes is undefined behavior, neither copy nor
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 93eec9325aa..cc7daf8a67a 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -553,6 +553,8 @@ public:
 
     uint64_t get_delete_bitmap_count();
 
+    bool has_calculated_for_multi_segments(const RowsetId& rowset_id) const;
+
     class AggCachePolicy : public LRUCachePolicy {
     public:
         AggCachePolicy(size_t capacity)
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index be4ad0c166e..ad412d2d2a5 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -38,6 +38,7 @@
 #include "olap/delta_writer.h"
 #include "olap/olap_common.h"
 #include "olap/partial_update_info.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/pending_rowset_helper.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_meta_manager.h"
@@ -531,6 +532,16 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     // update delete_bitmap
     if (tablet_txn_info->unique_key_merge_on_write) {
         int64_t t2 = MonotonicMicros();
+        if (rowset->num_segments() > 1 &&
+            !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments(
+                    rowset->rowset_id())) {
+            // delete bitmap is empty, should re-calculate delete bitmaps 
between segments
+            std::vector<segment_v2::SegmentSharedPtr> segments;
+            
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+            RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
+                    rowset, segments, tablet_txn_info->delete_bitmap));
+        }
+
         RETURN_IF_ERROR(
                 Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), 
transaction_id));
         int64_t t3 = MonotonicMicros();
diff --git 
a/regression-test/data/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.out
 
b/regression-test/data/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.out
new file mode 100644
index 00000000000..f9e767c3d20
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.groovy
 
b/regression-test/suites/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.groovy
new file mode 100644
index 00000000000..ff1d9531fb5
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_local_multi_segments_re_calc_in_publish.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_local_multi_segments_re_calc_in_publish", "docker") {
+
+    def dbName = context.config.getDbNameByFile(context.file)
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = false
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.beConfigs += [
+        'doris_scanner_row_bytes=1' // to cause multi segments
+    ]
+    options.enableDebugPoints()
+
+    docker(options) {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        def fe = cluster.getFeByIndex(1)
+
+        def table1 = "test_local_multi_segments_re_calc_in_publish"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(99999,99999,99999);"
+        sql "insert into ${table1} values(88888,88888,88888);"
+        sql "insert into ${table1} values(77777,77777,77777);"
+        sql "sync;"
+        qt_sql "select * from ${table1} order by k1;"
+
+        def do_streamload_2pc_commit = { txnId ->
+            def command = "curl -X PUT --location-trusted -u root:" +
+                    " -H txn_id:${txnId}" +
+                    " -H txn_operation:commit" +
+                    " 
http://${fe.host}:${fe.httpPort}/api/${dbName}/${table1}/_stream_load_2pc";
+            log.info("http_stream execute 2pc: ${command}")
+
+            def process = command.execute()
+            def code = process.waitFor()
+            def out = process.text
+            def json2pc = parseJson(out)
+            log.info("http_stream 2pc result: ${out}".toString())
+            assert code == 0
+            assert "success" == json2pc.status.toLowerCase()
+        }
+
+        def beNodes = sql_return_maparray("show backends;")
+        def tabletStat = sql_return_maparray("show tablets from 
${table1};").get(0)
+        def tabletBackendId = tabletStat.BackendId
+        def tabletId = tabletStat.TabletId
+        def be1
+        for (def be : beNodes) {
+            if (be.BackendId == tabletBackendId) {
+                be1 = be
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${be1.Host} with 
backendId=${be1.BackendId}");
+        logger.info("backends: ${cluster.getBackends()}")
+        int beIndex = 1
+        for (def backend : cluster.getBackends()) {
+            if (backend.host == be1.Host) {
+                beIndex = backend.index
+                break
+            }
+        }
+        assert cluster.getBeByIndex(beIndex).backendId as String == 
tabletBackendId
+
+        try {
+            // batch_size is 4164 in csv_reader.cpp
+            // _batch_size is 8192 in vtablet_writer.cpp
+            // to cause multi segments
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+
+            String txnId
+            // load data that will have multi segments and there are duplicate 
keys between segments
+            String content = ""
+            (1..4096).each {
+                content += "${it},${it},${it}\n"
+            }
+            content += content
+            streamLoad {
+                table "${table1}"
+                set 'column_separator', ','
+                inputStream new ByteArrayInputStream(content.getBytes())
+                set 'two_phase_commit', 'true'
+                time 30000
+
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    def json = parseJson(result)
+                    logger.info(result)
+                    txnId = json.TxnId
+                    assert "success" == json.Status.toLowerCase()
+                }
+            }
+
+            // restart be, so that load will re-calculate delete bitmaps in 
publish phase
+            Thread.sleep(1000)
+            cluster.stopBackends(1)
+            Thread.sleep(1000)
+            cluster.startBackends(beIndex)
+
+            Thread.sleep(1000)
+            do_streamload_2pc_commit(txnId)
+            dockerAwaitUntil(30) {
+                def result = sql_return_maparray "show transaction from 
${dbName} where id = ${txnId}"
+                result[0].TransactionStatus as String == "VISIBLE"
+            }
+
+            qt_sql "select count() from ${table1};"
+            qt_dup_key_count "select count() from (select k1,count() as cnt 
from ${table1} group by k1 having cnt > 1) A;"
+
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to