This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 41d907912a6 [improve](move-memtable) add fault injection in load stream stub (#29105) 41d907912a6 is described below commit 41d907912a668f3de934b52593ec41463e59da3b Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sat Dec 30 01:29:28 2023 +0800 [improve](move-memtable) add fault injection in load stream stub (#29105) --- be/src/io/fs/stream_sink_file_writer.cpp | 20 ++++ be/src/vec/sink/load_stream_stub.cpp | 2 + be/src/vec/sink/load_stream_stub_pool.cpp | 6 ++ .../test_load_stream_stub_failure_injection.groovy | 102 +++++++++++++++++++++ 4 files changed, 130 insertions(+) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index 2fce7c6baa9..74f34b44e6d 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -51,10 +51,30 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { << ", data_length: " << bytes_req; std::span<const Slice> slices {data, data_cnt}; + size_t stream_index = 0; bool ok = false; for (auto& stream : _streams) { auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, slices); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { + if (stream_index >= 2) { + st = Status::InternalError("stream sink file writer append data failed"); + } else { + stream_index++; + } + }); + + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { + if (stream_index >= 1) { + st = Status::InternalError("stream sink file writer append data failed"); + } else { + stream_index++; + } + }); + + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", { + st = Status::InternalError("stream sink file writer append data failed"); + }); ok = ok || st.ok(); } if (!ok) { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 78358765967..4b557cae93d 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -21,6 +21,7 @@ #include "olap/rowset/rowset_writer.h" #include "util/brpc_client_cache.h" +#include "util/debug_points.h" #include "util/network_util.h" #include "util/thrift_util.h" #include "util/uid_util.h" @@ -330,6 +331,7 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); ret = brpc::StreamWrite(_stream_id, buf); } + DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; }); switch (ret) { case 0: return Status::OK(); diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index bc19bad532c..f7eeb96bb04 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -17,6 +17,7 @@ #include "vec/sink/load_stream_stub_pool.h" +#include "util/debug_points.h" #include "vec/sink/load_stream_stub.h" namespace doris { @@ -29,16 +30,21 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStre void LoadStreams::release() { int num_use = --_use_cnt; + DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id; for (auto& stream : _streams) { auto st = stream->close_stream(); + DBUG_EXECUTE_IF("LoadStreams.release.close_stream_failed", + { st = Status::InternalError("stream close failed"); }); if (!st.ok()) { LOG(WARNING) << "close stream failed " << st; } } for (auto& stream : _streams) { auto st = stream->close_wait(); + DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed", + { st = Status::InternalError("stream close wait timeout"); }); if (!st.ok()) { LOG(WARNING) << "close wait failed " << st; } diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy new file mode 100644 index 00000000000..ed02d3e7ac1 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_stream_stub_fault_injection", "nonConcurrent") { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + def load_with_injection = { injection, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into test select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } + } + + // StreamSinkFileWriter appendv write segment failed one replica + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "") + // StreamSinkFileWriter appendv write segment failed two replica + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "") + // StreamSinkFileWriter appendv write segment failed all replica + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "stream sink file writer append data failed") + // LoadStreams stream wait failed + load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32") + // LoadStreams keeping stream when release + load_with_injection("LoadStreams.release.keeping_streams", "") + // LoadStreams close stream failed + load_with_injection("LoadStreams.release.close_stream_failed", "") + // LoadStreams close wait failed + load_with_injection("LoadStreams.release.close_wait_failed", "") + + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org