dataroaring commented on code in PR #18874: URL: https://github.com/apache/doris/pull/18874#discussion_r1184751344
########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) { return Status::OK(); } +void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) { + const auto& id = partition->id; + auto it = _partition_opened.find(id); + if (it == _partition_opened.end()) { + { + std::unique_lock<std::mutex> l(_partition_opened_mutex); + auto it = _partition_opened.find(id); + if (it != _partition_opened.end()) { + return; + } + _partition_opened.insert(std::pair(id, false)); + } + for (int j = 0; j < partition->indexes.size(); ++j) { + for (const auto& tid : partition->indexes[j].tablets) { + auto it = _channels[j]->_channels_by_tablet.find(tid); + for (const auto& channel : it->second) { + auto open_partition_closure = channel->open_partition(partition->id); + auto st = channel->open_partition_wait(open_partition_closure); + if (!st.ok()) { + _channels[j]->mark_as_failed( + channel->node_id(), channel->host(), + fmt::format("{}, open failed, err: {}", channel->channel_info(), + st.to_string()), + -1); + } + } + } + } + { + std::unique_lock<std::mutex> l(_partition_opened_mutex); Review Comment: sink is called in only one thread. ########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) { return Status::OK(); } +void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) { + const auto& id = partition->id; + auto it = _partition_opened.find(id); + if (it == _partition_opened.end()) { + { + std::unique_lock<std::mutex> l(_partition_opened_mutex); + auto it = _partition_opened.find(id); + if (it != _partition_opened.end()) { + return; + } + _partition_opened.insert(std::pair(id, false)); + } + for (int j = 0; j < partition->indexes.size(); ++j) { + for (const auto& tid : partition->indexes[j].tablets) { + auto it = _channels[j]->_channels_by_tablet.find(tid); + for (const auto& channel : it->second) { + auto open_partition_closure = channel->open_partition(partition->id); + auto st = channel->open_partition_wait(open_partition_closure); + if (!st.ok()) { + _channels[j]->mark_as_failed( + channel->node_id(), channel->host(), + fmt::format("{}, open failed, err: {}", channel->channel_info(), + st.to_string()), + -1); Review Comment: We should wait all together to reduce time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org