dataroaring commented on code in PR #18874:
URL: https://github.com/apache/doris/pull/18874#discussion_r1184751344


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
+    const auto& id = partition->id;
+    auto it = _partition_opened.find(id);
+    if (it == _partition_opened.end()) {
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);
+            auto it = _partition_opened.find(id);
+            if (it != _partition_opened.end()) {
+                return;
+            }
+            _partition_opened.insert(std::pair(id, false));
+        }
+        for (int j = 0; j < partition->indexes.size(); ++j) {
+            for (const auto& tid : partition->indexes[j].tablets) {
+                auto it = _channels[j]->_channels_by_tablet.find(tid);
+                for (const auto& channel : it->second) {
+                    auto open_partition_closure = 
channel->open_partition(partition->id);
+                    auto st = 
channel->open_partition_wait(open_partition_closure);
+                    if (!st.ok()) {
+                        _channels[j]->mark_as_failed(
+                                channel->node_id(), channel->host(),
+                                fmt::format("{}, open failed, err: {}", 
channel->channel_info(),
+                                            st.to_string()),
+                                -1);
+                    }
+                }
+            }
+        }
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);

Review Comment:
   sink is called in only one thread.



##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1062,6 +1135,41 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
+void VOlapTableSink::_open_partition(const VOlapTablePartition* partition) {
+    const auto& id = partition->id;
+    auto it = _partition_opened.find(id);
+    if (it == _partition_opened.end()) {
+        {
+            std::unique_lock<std::mutex> l(_partition_opened_mutex);
+            auto it = _partition_opened.find(id);
+            if (it != _partition_opened.end()) {
+                return;
+            }
+            _partition_opened.insert(std::pair(id, false));
+        }
+        for (int j = 0; j < partition->indexes.size(); ++j) {
+            for (const auto& tid : partition->indexes[j].tablets) {
+                auto it = _channels[j]->_channels_by_tablet.find(tid);
+                for (const auto& channel : it->second) {
+                    auto open_partition_closure = 
channel->open_partition(partition->id);
+                    auto st = 
channel->open_partition_wait(open_partition_closure);
+                    if (!st.ok()) {
+                        _channels[j]->mark_as_failed(
+                                channel->node_id(), channel->host(),
+                                fmt::format("{}, open failed, err: {}", 
channel->channel_info(),
+                                            st.to_string()),
+                                -1);

Review Comment:
   We should wait all together to reduce time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to