This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 51cb15d032d [improve](move-memtable) cancel load immediately when back 
pressure in delta writer v2 (#29280)
51cb15d032d is described below

commit 51cb15d032d929da9301be85dac61d7fd568cb1b
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sat Dec 30 10:45:06 2023 +0800

    [improve](move-memtable) cancel load immediately when back pressure in 
delta writer v2 (#29280)
---
 be/src/olap/delta_writer_v2.cpp                    |  19 ++--
 be/src/olap/delta_writer_v2.h                      |   7 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   2 +-
 be/test/vec/exec/delta_writer_v2_pool_test.cpp     |  10 +-
 ..._writer_v2_back_pressure_fault_injection.groovy | 106 +++++++++++++++++++++
 5 files changed, 132 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 643f1837982..5987c63e658 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -65,16 +65,18 @@ namespace doris {
 using namespace ErrorCode;
 
 std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open(
-        WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams) {
+        WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams,
+        RuntimeState* state) {
     std::unique_ptr<DeltaWriterV2> writer(
-            new DeltaWriterV2(req, streams, StorageEngine::instance()));
+            new DeltaWriterV2(req, streams, StorageEngine::instance(), state));
     return writer;
 }
 
 DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
                              const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
-                             StorageEngine* storage_engine)
-        : _req(*req),
+                             StorageEngine* storage_engine, RuntimeState* 
state)
+        : _state(state),
+          _req(*req),
           _tablet_schema(new TabletSchema),
           _memtable_writer(new MemTableWriter(*req)),
           _streams(streams) {}
@@ -158,8 +160,13 @@ Status DeltaWriterV2::write(const vectorized::Block* 
block, const std::vector<ui
     }
     {
         SCOPED_RAW_TIMER(&_wait_flush_limit_time);
-        while (_memtable_writer->flush_running_count() >=
-               config::memtable_flush_running_count_limit) {
+        auto memtable_flush_running_count_limit = 
config::memtable_flush_running_count_limit;
+        DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
+                        { memtable_flush_running_count_limit = 0; });
+        while (_memtable_writer->flush_running_count() >= 
memtable_flush_running_count_limit) {
+            if (_state->is_cancelled()) {
+                return Status::Cancelled(_state->cancel_reason());
+            }
             std::this_thread::sleep_for(std::chrono::milliseconds(10));
         }
     }
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index f896977d842..7db79680ee8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -64,7 +64,8 @@ class Block;
 class DeltaWriterV2 {
 public:
     static std::unique_ptr<DeltaWriterV2> open(
-            WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams);
+            WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+            RuntimeState* state);
 
     ~DeltaWriterV2();
 
@@ -88,7 +89,7 @@ public:
 
 private:
     DeltaWriterV2(WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
-                  StorageEngine* storage_engine);
+                  StorageEngine* storage_engine, RuntimeState* state);
 
     void _build_current_tablet_schema(int64_t index_id,
                                       const OlapTableSchemaParam* 
table_schema_param,
@@ -96,6 +97,8 @@ private:
 
     void _update_profile(RuntimeProfile* profile);
 
+    RuntimeState* _state = nullptr;
+
     bool _is_init = false;
     bool _is_cancelled = false;
     WriteRequest _req;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 4972a397f70..8ded964950b 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -438,7 +438,7 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
                 break;
             }
         }
-        return DeltaWriterV2::open(&req, streams);
+        return DeltaWriterV2::open(&req, streams, _state);
     });
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index d44fd17a761..a40724cf9ff 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -56,9 +56,13 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     auto map = pool.get_or_create(load_id);
     EXPECT_EQ(1, pool.size());
     WriteRequest req;
-    auto writer = map->get_or_create(100, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
-    auto writer2 = map->get_or_create(101, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
-    auto writer3 = map->get_or_create(100, [&req]() { return 
DeltaWriterV2::open(&req, {}); });
+    RuntimeState state;
+    auto writer = map->get_or_create(
+            100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, 
&state); });
+    auto writer2 = map->get_or_create(
+            101, [&req, &state]() { return DeltaWriterV2::open(&req, {}, 
&state); });
+    auto writer3 = map->get_or_create(
+            100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, 
&state); });
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);
     EXPECT_NE(writer, writer2);
diff --git 
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
new file mode 100644
index 00000000000..ea9e9ffb8bb
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_delta_writer_v2_back_pressure_fault_injection", "nonConcurrent") {
+    sql """ set enable_memtable_on_sink_node=true """
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """
+        CREATE TABLE IF NOT EXISTS `baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    streamLoad {
+        table "baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
+        def thread1 = new Thread({
+            try {
+                def res = sql "insert into test select * from baseall where k1 
<= 3"
+                logger.info(res.toString())
+            } catch(Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains("Communications link 
failure"))
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
+            }
+        })
+        thread1.start()
+
+        sleep(1000)
+
+        def processList = sql "show processlist"
+        logger.info(processList.toString())
+        processList.each { item ->
+            logger.info(item[1].toString())
+            logger.info(item[11].toString())
+            if (item[11].toString() == "insert into test select * from baseall 
where k1 <= 3".toString()){
+                def res = sql "kill ${item[1]}"
+                logger.info(res.toString())
+            }
+        }
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+    }
+
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to