This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 41d907912a6 [improve](move-memtable) add fault injection in load 
stream stub (#29105)
41d907912a6 is described below

commit 41d907912a668f3de934b52593ec41463e59da3b
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sat Dec 30 01:29:28 2023 +0800

    [improve](move-memtable) add fault injection in load stream stub (#29105)
---
 be/src/io/fs/stream_sink_file_writer.cpp           |  20 ++++
 be/src/vec/sink/load_stream_stub.cpp               |   2 +
 be/src/vec/sink/load_stream_stub_pool.cpp          |   6 ++
 .../test_load_stream_stub_failure_injection.groovy | 102 +++++++++++++++++++++
 4 files changed, 130 insertions(+)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 2fce7c6baa9..74f34b44e6d 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,10 +51,30 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
                << ", data_length: " << bytes_req;
 
     std::span<const Slice> slices {data, data_cnt};
+    size_t stream_index = 0;
     bool ok = false;
     for (auto& stream : _streams) {
         auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
                                       _bytes_appended, slices);
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
+            if (stream_index >= 2) {
+                st = Status::InternalError("stream sink file writer append 
data failed");
+            } else {
+                stream_index++;
+            }
+        });
+
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
+            if (stream_index >= 1) {
+                st = Status::InternalError("stream sink file writer append 
data failed");
+            } else {
+                stream_index++;
+            }
+        });
+
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 {
+            st = Status::InternalError("stream sink file writer append data 
failed");
+        });
         ok = ok || st.ok();
     }
     if (!ok) {
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 78358765967..4b557cae93d 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,6 +21,7 @@
 
 #include "olap/rowset/rowset_writer.h"
 #include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
 #include "util/network_util.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
@@ -330,6 +331,7 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             ret = brpc::StreamWrite(_stream_id, buf);
         }
+        DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", 
{ ret = EPIPE; });
         switch (ret) {
         case 0:
             return Status::OK();
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index bc19bad532c..f7eeb96bb04 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/sink/load_stream_stub_pool.h"
 
+#include "util/debug_points.h"
 #include "vec/sink/load_stream_stub.h"
 
 namespace doris {
@@ -29,16 +30,21 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, 
int num_use, LoadStre
 
 void LoadStreams::release() {
     int num_use = --_use_cnt;
+    DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
         LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
         for (auto& stream : _streams) {
             auto st = stream->close_stream();
+            DBUG_EXECUTE_IF("LoadStreams.release.close_stream_failed",
+                            { st = Status::InternalError("stream close 
failed"); });
             if (!st.ok()) {
                 LOG(WARNING) << "close stream failed " << st;
             }
         }
         for (auto& stream : _streams) {
             auto st = stream->close_wait();
+            DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
+                            { st = Status::InternalError("stream close wait 
timeout"); });
             if (!st.ok()) {
                 LOG(WARNING) << "close wait failed " << st;
             }
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
new file mode 100644
index 00000000000..ed02d3e7ac1
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_stream_stub_fault_injection", "nonConcurrent") {
+    sql """ set enable_memtable_on_sink_node=true """
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """
+        CREATE TABLE IF NOT EXISTS `baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    streamLoad {
+        table "baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    def load_with_injection = { injection, error_msg->
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(injection)
+            sql "insert into test select * from baseall where k1 <= 3"
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains(error_msg))
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(injection)
+        }
+    }
+
+    // StreamSinkFileWriter appendv write segment failed one replica
+    
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "")
+    // StreamSinkFileWriter appendv write segment failed two replica
+    
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "")
+    // StreamSinkFileWriter appendv write segment failed all replica
+    
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "stream sink file writer append data failed")
+    // LoadStreams stream wait failed
+    load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", 
"StreamWrite failed, err=32")
+    // LoadStreams keeping stream when release
+    load_with_injection("LoadStreams.release.keeping_streams", "")
+    // LoadStreams close stream failed
+    load_with_injection("LoadStreams.release.close_stream_failed", "")
+    // LoadStreams close wait failed
+    load_with_injection("LoadStreams.release.close_wait_failed", "")
+
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to