This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fe111207a9 [Fix](lazy_open) Fix lazy open null point (#19829) fe111207a9 is described below commit fe111207a96a49d96ded1adff937c154e924fdd3 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Tue May 23 09:17:46 2023 +0800 [Fix](lazy_open) Fix lazy open null point (#19829) --- be/src/runtime/load_channel.cpp | 22 +++++++++++++++------- be/src/vec/sink/vtablet_sink.cpp | 6 ++++++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index e26564962a..f0ed39d7e0 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -87,6 +87,15 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { Status LoadChannel::open_partition(const OpenPartitionRequest& params) { int64_t index_id = params.index_id(); + + // check finish + { + std::lock_guard<std::mutex> l(_lock); + auto it = _finished_channel_ids.find(index_id); + if (it != _finished_channel_ids.end()) { + return Status::OK(); + } + } std::shared_ptr<TabletsChannel> channel; { std::lock_guard<std::mutex> l(_lock); @@ -94,14 +103,13 @@ Status LoadChannel::open_partition(const OpenPartitionRequest& params) { if (it != _tablets_channels.end()) { channel = it->second; } else { - // create a new tablets channel - TabletsChannelKey key(params.id(), index_id); - channel = std::make_shared<TabletsChannel>(key, _load_id, _is_high_priority, - _self_profile); - { - std::lock_guard<SpinLock> l(_tablets_channels_lock); - _tablets_channels.insert({index_id, channel}); + fmt::memory_buffer buf; + for (auto tablet : params.tablets()) { + fmt::format_to(buf, "tablet id:{}", tablet.tablet_id()); } + LOG(WARNING) << "should be opened partition index id=" << params.index_id() + << "tablet ids=" << fmt::to_string(buf); + return Status::InternalError("Partition should be opened"); } } RETURN_IF_ERROR(channel->open_all_writers_for_partition(params)); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index b8c796c964..ecdefe54d0 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1107,10 +1107,13 @@ Status VOlapTableSink::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + fmt::memory_buffer buf; for (auto index_channel : _channels) { + fmt::format_to(buf, "index id:{}", index_channel->_index_id); index_channel->for_each_node_channel( [](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); }); } + LOG(INFO) << "list of open index id = " << fmt::to_string(buf); for (auto index_channel : _channels) { index_channel->for_each_node_channel([&index_channel]( @@ -1145,7 +1148,9 @@ void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) { auto it = _opened_partitions.find(id); if (it == _opened_partitions.end()) { _opened_partitions.insert(id); + fmt::memory_buffer buf; for (int j = 0; j < partition->indexes.size(); ++j) { + fmt::format_to(buf, "index id:{}", partition->indexes[j].index_id); for (const auto& tid : partition->indexes[j].tablets) { auto it = _channels[j]->_channels_by_tablet.find(tid); for (const auto& channel : it->second) { @@ -1153,6 +1158,7 @@ void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) { } } } + LOG(INFO) << "list of lazy open index id = " << fmt::to_string(buf); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org