This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 14310ad30b [improvement](move-memtable) wait StreamClose from remote 
(#23605)
14310ad30b is described below

commit 14310ad30b569bc65c6eadce862e1599ba32b448
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Wed Aug 30 18:03:36 2023 +0800

    [improvement](move-memtable) wait StreamClose from remote (#23605)
    
    * [fix](move-memtable) wait StreamClose from remote
---
 be/src/runtime/load_stream.cpp       |  1 +
 be/src/vec/sink/vtablet_sink_v2.cpp  | 12 +++++------
 be/src/vec/sink/vtablet_sink_v2.h    |  2 +-
 be/test/runtime/load_stream_test.cpp | 41 +++++++++++++++++-------------------
 4 files changed, 27 insertions(+), 29 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 961f73540a..5abf75476c 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -430,6 +430,7 @@ int LoadStream::on_received_messages(StreamId id, 
butil::IOBuf* const messages[]
             auto st =
                     close(hdr.src_id(), tablets_to_commit, 
&success_tablet_ids, &failed_tablet_ids);
             _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+            brpc::StreamClose(id);
         } break;
         default:
             LOG(WARNING) << "unexpected stream message " << hdr.opcode();
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 196e5ff7e2..243507db71 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -125,13 +125,13 @@ int 
StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* con
                              << status;
             }
         }
-
-        _sink->_pending_reports.fetch_add(-1);
     }
     return 0;
 }
 
-void StreamSinkHandler::on_closed(brpc::StreamId id) {}
+void StreamSinkHandler::on_closed(brpc::StreamId id) {
+    _sink->_pending_streams.fetch_add(-1);
+}
 
 VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
                                    const std::vector<TExpr>& texprs, Status* 
status)
@@ -298,6 +298,7 @@ Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& 
node_info, StreamPool
                                          cntl.ErrorText());
         }
         stream_pool.push_back(stream);
+        _pending_streams.fetch_add(1);
     }
     return Status::OK();
 }
@@ -522,10 +523,10 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
 
         {
             SCOPED_TIMER(_close_load_timer);
-            while (_pending_reports.load() > 0) {
+            while (_pending_streams.load() > 0) {
                 // TODO: use a better wait
                 std::this_thread::sleep_for(std::chrono::milliseconds(1));
-                LOG(INFO) << "sinkv2 close_wait, pending reports: " << 
_pending_reports.load();
+                LOG(INFO) << "sinkv2 close_wait, pending streams: " << 
_pending_streams.load();
             }
         }
 
@@ -587,7 +588,6 @@ Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) 
{
     size_t header_len = header.ByteSizeLong();
     buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
     buf.append(header.SerializeAsString());
-    _pending_reports.fetch_add(1);
     io::StreamSinkFileWriter::send_with_retry(stream, buf);
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 8f4ce8f42f..c2c24a26fb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -226,7 +226,7 @@ private:
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
 
-    std::atomic<int> _pending_reports {0};
+    std::atomic<int> _pending_streams {0};
 
     std::unordered_map<int64_t, std::vector<int64_t>> _tablet_success_map;
     std::unordered_map<int64_t, std::vector<int64_t>> _tablet_failure_map;
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index e3d3868547..00f53b5c5d 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -679,9 +679,8 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
-    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -708,12 +707,11 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
 
     close_load(client, 0);
     wait_for_ack(3);
-    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -743,7 +741,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -773,12 +771,12 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
 
-TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment0_zero_bytes) {
+TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment0_zero_bytes) {
     MockSinkClient client;
     auto st = client.connect_stream();
     EXPECT_TRUE(st.ok());
@@ -814,12 +812,12 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment0_zero_b
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
 
-TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0) {
+TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
     MockSinkClient client;
     auto st = client.connect_stream();
     EXPECT_TRUE(st.ok());
@@ -860,12 +858,12 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment0) {
     auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID, 0);
     EXPECT_EQ(written_data, data + data);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
 
-TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment_without_eos) {
+TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment_without_eos) {
     MockSinkClient client;
     auto st = client.connect_stream();
     EXPECT_TRUE(st.ok());
@@ -901,12 +899,12 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment_without
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
 
-TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment1) {
+TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
     MockSinkClient client;
     auto st = client.connect_stream();
     EXPECT_TRUE(st.ok());
@@ -944,7 +942,7 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_signle_segment1) {
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -997,7 +995,7 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_two_segment) {
     written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID, 1);
     EXPECT_EQ(written_data, data2);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -1058,7 +1056,7 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_three_tablet) {
     written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID + 2, 0);
     EXPECT_EQ(written_data, data2);
 
-    client.disconnect();
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
@@ -1100,13 +1098,14 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
     // duplicated close
     close_load(clients[1], 1);
     wait_for_ack(2);
-    EXPECT_EQ(g_response_stat.num, 2);
+    // stream closed, no response will be sent
+    EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     close_load(clients[0], 0);
-    wait_for_ack(3);
-    EXPECT_EQ(g_response_stat.num, 3);
+    wait_for_ack(2);
+    EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
@@ -1130,9 +1129,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
         EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
     }
 
-    for (int i = 0; i < 2; i++) {
-        clients[i].disconnect();
-    }
+    // server will close stream on CLOSE_LOAD
     wait_for_close();
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to