This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 51cb15d032d [improve](move-memtable) cancel load immediately when back pressure in delta writer v2 (#29280) 51cb15d032d is described below commit 51cb15d032d929da9301be85dac61d7fd568cb1b Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sat Dec 30 10:45:06 2023 +0800 [improve](move-memtable) cancel load immediately when back pressure in delta writer v2 (#29280) --- be/src/olap/delta_writer_v2.cpp | 19 ++-- be/src/olap/delta_writer_v2.h | 7 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- be/test/vec/exec/delta_writer_v2_pool_test.cpp | 10 +- ..._writer_v2_back_pressure_fault_injection.groovy | 106 +++++++++++++++++++++ 5 files changed, 132 insertions(+), 12 deletions(-) diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 643f1837982..5987c63e658 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -65,16 +65,18 @@ namespace doris { using namespace ErrorCode; std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open( - WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams) { + WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + RuntimeState* state) { std::unique_ptr<DeltaWriterV2> writer( - new DeltaWriterV2(req, streams, StorageEngine::instance())); + new DeltaWriterV2(req, streams, StorageEngine::instance(), state)); return writer; } DeltaWriterV2::DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - StorageEngine* storage_engine) - : _req(*req), + StorageEngine* storage_engine, RuntimeState* state) + : _state(state), + _req(*req), _tablet_schema(new TabletSchema), _memtable_writer(new MemTableWriter(*req)), _streams(streams) {} @@ -158,8 +160,13 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui } { SCOPED_RAW_TIMER(&_wait_flush_limit_time); - while (_memtable_writer->flush_running_count() >= - config::memtable_flush_running_count_limit) { + auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit; + DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure", + { memtable_flush_running_count_limit = 0; }); + while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { + if (_state->is_cancelled()) { + return Status::Cancelled(_state->cancel_reason()); + } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index f896977d842..7db79680ee8 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -64,7 +64,8 @@ class Block; class DeltaWriterV2 { public: static std::unique_ptr<DeltaWriterV2> open( - WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams); + WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + RuntimeState* state); ~DeltaWriterV2(); @@ -88,7 +89,7 @@ public: private: DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, - StorageEngine* storage_engine); + StorageEngine* storage_engine, RuntimeState* state); void _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, @@ -96,6 +97,8 @@ private: void _update_profile(RuntimeProfile* profile); + RuntimeState* _state = nullptr; + bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 4972a397f70..8ded964950b 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -438,7 +438,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block break; } } - return DeltaWriterV2::open(&req, streams); + return DeltaWriterV2::open(&req, streams, _state); }); { SCOPED_TIMER(_wait_mem_limit_timer); diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index d44fd17a761..a40724cf9ff 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -56,9 +56,13 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { auto map = pool.get_or_create(load_id); EXPECT_EQ(1, pool.size()); WriteRequest req; - auto writer = map->get_or_create(100, [&req]() { return DeltaWriterV2::open(&req, {}); }); - auto writer2 = map->get_or_create(101, [&req]() { return DeltaWriterV2::open(&req, {}); }); - auto writer3 = map->get_or_create(100, [&req]() { return DeltaWriterV2::open(&req, {}); }); + RuntimeState state; + auto writer = map->get_or_create( + 100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); }); + auto writer2 = map->get_or_create( + 101, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); }); + auto writer3 = map->get_or_create( + 100, [&req, &state]() { return DeltaWriterV2::open(&req, {}, &state); }); EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3); EXPECT_NE(writer, writer2); diff --git a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy new file mode 100644 index 00000000000..ea9e9ffb8bb --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_delta_writer_v2_back_pressure_fault_injection", "nonConcurrent") { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure") + def thread1 = new Thread({ + try { + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("Communications link failure")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure") + } + }) + thread1.start() + + sleep(1000) + + def processList = sql "show processlist" + logger.info(processList.toString()) + processList.each { item -> + logger.info(item[1].toString()) + logger.info(item[11].toString()) + if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ + def res = sql "kill ${item[1]}" + logger.info(res.toString()) + } + } + } catch(Exception e) { + logger.info(e.getMessage()) + } + + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org