This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b1fdbb3ab51 branch-4.1: [fix](delta writer) Fix shared delta writer
state lifetime (#64504)
b1fdbb3ab51 is described below
commit b1fdbb3ab5146764045fcfddf33d2059b3d2cbbc
Author: bobhan1 <[email protected]>
AuthorDate: Tue Jun 16 13:27:21 2026 +0800
branch-4.1: [fix](delta writer) Fix shared delta writer state lifetime
(#64504)
### What problem does this PR solve?
Backport #64349 to branch-4.1.
Shared `DeltaWriterV2` instances can be reused by multiple local sinks
from the same load. Before this change, the shared writer stored the
`RuntimeState*` from the sink that first created it. If that creator
sink finished and its `RuntimeState` was destroyed while another local
sink continued to reuse the shared writer, `DeltaWriterV2::write()`
could access the destroyed state in the memtable flush-limit
cancellation path, causing a BE crash or ASAN use-after-free.
This PR removes the stored `RuntimeState*` from `DeltaWriterV2`. The
shared writer keeps only the stable `WorkloadGroup` pointer needed by
`MemTableWriter` initialization, and `VTabletWriterV2` passes a per-call
cancel checker into `DeltaWriterV2::write()` so cancellation is
evaluated against the current sink.
Branch-4.1 adaptation:
- keep the detailed delta-writer profile gate at the `VTabletWriterV2`
call site before passing a profile into the shared writer;
- construct the new BE test schema with branch-local test helpers and
protobuf fields.
### Check List
- [x] `git diff HEAD^ HEAD --check`
- [x] `./run-be-ut.sh --run
--filter=TestVTabletWriterV2.shared_delta_writer_should_not_access_destroyed_creator_runtime_state:DeltaWriterV2PoolTest.*
-j100`
### Release note
Fix a possible BE crash when shared delta writers are reused by multiple
local sinks.
---
be/src/exec/sink/writer/vtablet_writer_v2.cpp | 17 +-
be/src/load/delta_writer/delta_writer_v2.cpp | 29 +--
be/src/load/delta_writer/delta_writer_v2.h | 10 +-
be/test/exec/sink/vtablet_writer_v2_test.cpp | 231 +++++++++++++++++++++
.../delta_writer/delta_writer_v2_pool_test.cpp | 13 +-
5 files changed, 268 insertions(+), 32 deletions(-)
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index 2494f238a4f..f85ba027fe4 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -578,7 +578,11 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
<< " not found in schema, load_id=" <<
print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
- return DeltaWriterV2::create_unique(&req, streams, _state);
+ std::shared_ptr<WorkloadGroup> workload_group = nullptr;
+ if (_state->get_query_ctx()) {
+ workload_group = _state->workload_group();
+ }
+ return DeltaWriterV2::create_unique(&req, streams, workload_group);
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
@@ -594,7 +598,12 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
}
}
SCOPED_TIMER(_write_memtable_timer);
- st = delta_writer->write(block.get(), rows.row_idxes);
+ st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() {
+ if (state->is_cancelled()) {
+ return state->cancel_reason();
+ }
+ return Status::OK();
+ });
return st;
}
@@ -677,7 +686,9 @@ Status VTabletWriterV2::close(Status exec_status) {
std::unordered_map<int64_t, int32_t> segments_for_tablet;
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
- auto st = _delta_writer_for_tablet->close(segments_for_tablet,
_operator_profile);
+ RuntimeProfile* delta_writer_profile =
+ _state->profile_level() >= 2 ? _operator_profile : nullptr;
+ auto st = _delta_writer_for_tablet->close(segments_for_tablet,
delta_writer_profile);
_delta_writer_for_tablet.reset();
if (!st.ok()) {
_cancel(st);
diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp
b/be/src/load/delta_writer/delta_writer_v2.cpp
index 7ffef693c58..5e7f91db103 100644
--- a/be/src/load/delta_writer/delta_writer_v2.cpp
+++ b/be/src/load/delta_writer/delta_writer_v2.cpp
@@ -66,9 +66,9 @@ using namespace ErrorCode;
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
- RuntimeState* state)
- : _state(state),
- _req(*req),
+ std::shared_ptr<WorkloadGroup> workload_group)
+ : _req(*req),
+ _workload_group(std::move(workload_group)),
_tablet_schema(new TabletSchema),
_memtable_writer(new MemTableWriter(*req)),
_streams(streams) {}
@@ -127,19 +127,17 @@ Status DeltaWriterV2::init() {
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
- std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
- if (_state->get_query_ctx()) {
- wg_sptr = _state->get_query_ctx()->workload_group();
- }
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
_partial_update_info,
- wg_sptr,
_streams[0]->enable_unique_mow(_req.index_id)));
+ _workload_group,
+
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
return Status::OK();
}
-Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>&
row_idxs) {
+Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>&
row_idxs,
+ const std::function<Status()>& cancel_check) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
@@ -155,9 +153,8 @@ Status DeltaWriterV2::write(const Block* block, const
DorisVector<uint32_t>& row
DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
{
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
while (_memtable_writer->flush_running_count() >=
memtable_flush_running_count_limit) {
- if (_state->is_cancelled()) {
- return _state->cancel_reason();
- }
+ DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait",
DBUG_RUN_CALLBACK());
+ RETURN_IF_ERROR(cancel_check());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
@@ -186,14 +183,10 @@ Status DeltaWriterV2::close_wait(int32_t& num_segments,
RuntimeProfile* profile)
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
- if (_state->profile_level() >= 2 && profile != nullptr) {
+ if (profile != nullptr) {
_update_profile(profile);
}
- if (_state->profile_level() >= 2) {
- RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
- } else {
- RETURN_IF_ERROR(_memtable_writer->close_wait());
- }
+ RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
num_segments = _rowset_writer->next_segment_id();
_delta_written_success = true;
diff --git a/be/src/load/delta_writer/delta_writer_v2.h
b/be/src/load/delta_writer/delta_writer_v2.h
index d637c6afcbc..bfdf0fd3a25 100644
--- a/be/src/load/delta_writer/delta_writer_v2.h
+++ b/be/src/load/delta_writer/delta_writer_v2.h
@@ -24,6 +24,7 @@
#include <stdint.h>
#include <atomic>
+#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
@@ -52,6 +53,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class BetaRowsetWriterV2;
class LoadStreamStub;
+class WorkloadGroup;
class Block;
@@ -62,13 +64,14 @@ class DeltaWriterV2 {
public:
DeltaWriterV2(WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
- RuntimeState* state);
+ std::shared_ptr<WorkloadGroup> workload_group);
~DeltaWriterV2();
Status init();
- Status write(const Block* block, const DorisVector<uint32_t>& row_idxs);
+ Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
+ const std::function<Status()>& cancel_check);
// flush the last memtable to flush queue, must call it before close_wait()
Status close();
@@ -88,11 +91,10 @@ private:
void _update_profile(RuntimeProfile* profile);
- RuntimeState* _state = nullptr;
-
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
+ std::shared_ptr<WorkloadGroup> _workload_group;
std::shared_ptr<BetaRowsetWriterV2> _rowset_writer;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success = false;
diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp
b/be/test/exec/sink/vtablet_writer_v2_test.cpp
index d62885d1395..2ddde5bc4b3 100644
--- a/be/test/exec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp
@@ -18,9 +18,28 @@
#include "exec/sink/writer/vtablet_writer_v2.h"
#include <gtest/gtest.h>
+#include <unistd.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "core/data_type/data_type_number.h"
+#include "exec/operator/operator_helper.h"
+#include "exec/sink/delta_writer_v2_pool.h"
#include "exec/sink/load_stream_map_pool.h"
#include "exec/sink/load_stream_stub.h"
+#include "exec/sink/sink_test_utils.h"
+#include "io/fs/local_file_system.h"
+#include "load/memtable/memtable_memory_limiter.h"
+#include "runtime/exec_env.h"
+#include "storage/storage_engine.h"
+#include "storage/tablet/tablet_schema.h"
+#include "testutil/column_helper.h"
+#include "testutil/creators.h"
+#include "util/debug_points.h"
+#include "util/defer_op.h"
namespace doris {
@@ -63,6 +82,218 @@ static std::unique_ptr<VTabletWriterV2>
create_vtablet_writer(int num_replicas =
return writer;
}
+static TColumn create_int_column_desc(bool is_nullable) {
+ TColumn column;
+ column.__set_column_name("c1");
+ column.__set_column_type(TColumnType());
+ column.column_type.__set_type(TPrimitiveType::INT);
+ column.__set_is_key(true);
+ column.__set_aggregation_type(TAggregationType::NONE);
+ column.__set_is_allow_null(is_nullable);
+ column.__set_col_unique_id(1);
+ return column;
+}
+
+static void prepare_load_runtime_state(MockRuntimeState& state, int sender_id)
{
+ state.set_backend_id(1);
+ state.set_per_fragment_instance_idx(sender_id);
+ state.set_num_per_fragment_instances(2);
+ state.set_load_stream_per_node(1);
+ state.set_total_load_streams(2);
+ state.set_num_local_sink(2);
+}
+
+static void prepare_open_streams(std::shared_ptr<LoadStreamMap>
load_stream_map, int64_t node_id,
+ int64_t index_id, const TabletSchemaSPtr&
tablet_schema) {
+ auto streams = load_stream_map->get_or_create(node_id);
+ streams->mark_open();
+ for (auto& stream : streams->streams()) {
+ stream->_is_open.store(true);
+ stream->_status = Status::OK();
+ stream->_tablet_schema_for_index->emplace(index_id, tablet_schema);
+ stream->_enable_unique_mow_for_index->emplace(index_id, false);
+ }
+}
+
+static TabletSchemaSPtr create_int_tablet_schema() {
+ TabletSchemaPB tablet_schema_pb;
+ tablet_schema_pb.set_keys_type(DUP_KEYS);
+ tablet_schema_pb.set_num_short_key_columns(1);
+ tablet_schema_pb.set_num_rows_per_row_block(1024);
+ tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+ tablet_schema_pb.set_next_column_unique_id(2);
+ auto* column = tablet_schema_pb.add_column();
+ column->set_unique_id(1);
+ column->set_name("c1");
+ column->set_type("INT");
+ column->set_is_key(true);
+ column->set_length(4);
+ column->set_index_length(4);
+ column->set_is_nullable(true);
+ column->set_is_bf_column(false);
+
+ auto tablet_schema = std::make_shared<TabletSchema>();
+ tablet_schema->init_from_pb(tablet_schema_pb);
+ return tablet_schema;
+}
+
+static TDataSink create_vtablet_writer_sink(const TOlapTableSchemaParam&
schema,
+ const TOlapTablePartitionParam&
partition,
+ const TOlapTableLocationParam&
location,
+ TTupleId tuple_id, const
TUniqueId& load_id) {
+ TDataSink t_sink;
+ t_sink.__isset.olap_table_sink = true;
+ auto& olap_sink = t_sink.olap_table_sink;
+ olap_sink.__set_load_id(load_id);
+ olap_sink.__set_txn_id(1);
+ olap_sink.__set_db_id(schema.db_id);
+ olap_sink.__set_table_id(schema.table_id);
+ olap_sink.__set_tuple_id(tuple_id);
+ olap_sink.__set_num_replicas(1);
+ olap_sink.__set_need_gen_rollup(false);
+ olap_sink.__set_schema(schema);
+ olap_sink.__set_partition(partition);
+ olap_sink.__set_location(location);
+
+ TNodeInfo node;
+ node.__set_id(1);
+ node.__set_option(0);
+ node.__set_host("127.0.0.1");
+ node.__set_async_internal_port(8060);
+ TPaloNodesInfo nodes;
+ nodes.nodes.push_back(node);
+ olap_sink.__set_nodes_info(nodes);
+ return t_sink;
+}
+
+TEST_F(TestVTabletWriterV2,
shared_delta_writer_should_not_access_destroyed_creator_runtime_state) {
+ const bool old_share_delta_writers = config::share_delta_writers;
+ const int32_t old_flush_running_count_limit =
config::memtable_flush_running_count_limit;
+ const bool old_enable_debug_points = config::enable_debug_points;
+ config::share_delta_writers = true;
+ Defer restore_configs([&] {
+ config::share_delta_writers = old_share_delta_writers;
+ config::memtable_flush_running_count_limit =
old_flush_running_count_limit;
+ config::enable_debug_points = old_enable_debug_points;
+
DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");
+ });
+
+ ExecEnv* exec_env = ExecEnv::GetInstance();
+ auto old_load_stream_map_pool = std::move(exec_env->_load_stream_map_pool);
+ auto old_delta_writer_v2_pool = std::move(exec_env->_delta_writer_v2_pool);
+ auto old_memtable_memory_limiter =
std::move(exec_env->_memtable_memory_limiter);
+ auto old_storage_engine = std::move(exec_env->_storage_engine);
+ const std::string old_storage_root_path = config::storage_root_path;
+ char cwd_buffer[1024];
+ ASSERT_NE(nullptr, getcwd(cwd_buffer, sizeof(cwd_buffer)));
+ const std::string test_data_dir =
+ std::string(cwd_buffer) +
"/vtablet_writer_v2_shared_delta_writer_test";
+ Defer restore_exec_env([&]() mutable {
+ exec_env->_delta_writer_v2_pool.reset();
+ exec_env->_load_stream_map_pool.reset();
+ exec_env->_storage_engine.reset();
+ exec_env->_memtable_memory_limiter.reset();
+ exec_env->_storage_engine = std::move(old_storage_engine);
+ exec_env->_memtable_memory_limiter =
std::move(old_memtable_memory_limiter);
+ exec_env->_delta_writer_v2_pool = std::move(old_delta_writer_v2_pool);
+ exec_env->_load_stream_map_pool = std::move(old_load_stream_map_pool);
+ config::storage_root_path = old_storage_root_path;
+
static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir));
+ });
+
+ config::storage_root_path = test_data_dir;
+
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(test_data_dir).ok());
+
ASSERT_TRUE(io::global_local_filesystem()->create_directory(test_data_dir).ok());
+ EngineOptions options;
+ options.store_paths.emplace_back(test_data_dir, -1);
+ auto engine = std::make_unique<StorageEngine>(options);
+ ASSERT_TRUE(engine->open().ok());
+ exec_env->_storage_engine = std::move(engine);
+ auto memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
+ ASSERT_TRUE(memtable_memory_limiter->init(1024 * 1024 * 1024).ok());
+ exec_env->_memtable_memory_limiter = std::move(memtable_memory_limiter);
+ exec_env->_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
+ exec_env->_delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>();
+
+ auto creator_ctx = std::make_unique<OperatorContext>();
+ OperatorContext current_ctx;
+ prepare_load_runtime_state(creator_ctx->state, 0);
+ prepare_load_runtime_state(current_ctx.state, 1);
+
+ TOlapTableSchemaParam schema;
+ TTupleId tuple_id = 0;
+ int64_t index_id = 0;
+ sink_test_utils::build_desc_tbl_and_schema(*creator_ctx, schema, tuple_id,
index_id, false);
+ sink_test_utils::build_desc_tbl_and_schema(current_ctx, schema, tuple_id,
index_id, false);
+ schema.indexes[0].__set_columns_desc({create_int_column_desc(false)});
+
+ TUniqueId load_id;
+ load_id.hi = 380;
+ load_id.lo = 1;
+ const auto partition = sink_test_utils::build_partition_param(index_id);
+ const auto location = sink_test_utils::build_location_param();
+ const auto t_sink = create_vtablet_writer_sink(schema, partition,
location, tuple_id, load_id);
+
+ VExprContextSPtrs output_exprs;
+ auto creator_writer = std::make_unique<VTabletWriterV2>(t_sink,
output_exprs, nullptr, nullptr);
+ auto current_writer = std::make_unique<VTabletWriterV2>(t_sink,
output_exprs, nullptr, nullptr);
+ ASSERT_TRUE(creator_writer->_init(&creator_ctx->state,
&creator_ctx->profile).ok());
+ ASSERT_TRUE(current_writer->_init(¤t_ctx.state,
¤t_ctx.profile).ok());
+ ASSERT_EQ(creator_writer->_delta_writer_for_tablet,
current_writer->_delta_writer_for_tablet);
+
+ const auto tablet_schema = create_int_tablet_schema();
+ prepare_open_streams(creator_writer->_load_stream_map, 1, index_id,
tablet_schema);
+
+ auto block =
std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt32>({1}));
+ Rows rows;
+ rows.partition_id = 1;
+ rows.index_id = index_id;
+ rows.row_idxes.push_back(0);
+
+ const auto first_write_status = creator_writer->_write_memtable(block,
100, rows);
+ ASSERT_TRUE(first_write_status.ok()) << first_write_status;
+ ASSERT_EQ(1, creator_writer->_delta_writer_for_tablet->size());
+
+ // The first write above creates the shared DeltaWriterV2 and stores
+ // creator_ctx->state in DeltaWriterV2::_state. Destroy the creator sink
and
+ // its RuntimeState to reproduce the original lifetime boundary: another
+ // local sink can still reuse the shared writer after the creator state is
+ // gone. Do not call creator_writer->_cancel() here because it cancels the
+ // shared writer and would hide the dangling RuntimeState path.
+ creator_writer.reset();
+ creator_ctx.reset();
+
+ // Force the current sink into DeltaWriterV2::write()'s flush-limit wait
+ // path, then cancel the current RuntimeState. Fixed code should observe
the
+ // current sink's cancel state and exit cleanly; current broken code reads
+ // the destroyed creator RuntimeState from the shared DeltaWriterV2 and
ASAN
+ // reports heap-use-after-free in the child process.
+ auto debug_point = std::make_shared<DebugPoint>();
+ debug_point->execute_limit = 1;
+ debug_point->callback = std::function<void()>(
+ [&] { current_ctx.state.cancel(Status::Cancelled("current state
cancelled")); });
+ config::enable_debug_points = true;
+ DebugPoints::instance()->add("DeltaWriterV2.write.flush_limit_wait",
debug_point);
+ config::memtable_flush_running_count_limit = 0;
+
+ EXPECT_EXIT(
+ {
+ alarm(10);
+ auto status = current_writer->_write_memtable(block, 100,
rows);
+ if (!status.ok() &&
+ status.msg().find("current state cancelled") !=
std::string::npos) {
+ _exit(0);
+ }
+ _exit(1);
+ },
+ ::testing::ExitedWithCode(0), "");
+
+ config::memtable_flush_running_count_limit = old_flush_running_count_limit;
+ DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");
+
+ current_writer->_cancel(Status::Cancelled("test cleanup"));
+}
+
TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
diff --git a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
index 2b6ce8091bf..159bc8b58c9 100644
--- a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
+++ b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
@@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
- RuntimeState state;
- auto writer = map->get_or_create(100, [&req, &state]() {
+ auto writer = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
- &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+ &req, std::vector<std::shared_ptr<LoadStreamStub>> {},
nullptr);
});
- auto writer2 = map->get_or_create(101, [&req, &state]() {
+ auto writer2 = map->get_or_create(101, [&req]() {
return std::make_unique<DeltaWriterV2>(
- &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+ &req, std::vector<std::shared_ptr<LoadStreamStub>> {},
nullptr);
});
- auto writer3 = map->get_or_create(100, [&req, &state]() {
+ auto writer3 = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
- &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+ &req, std::vector<std::shared_ptr<LoadStreamStub>> {},
nullptr);
});
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]