This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 14310ad30b [improvement](move-memtable) wait StreamClose from remote (#23605) 14310ad30b is described below commit 14310ad30b569bc65c6eadce862e1599ba32b448 Author: Kaijie Chen <c...@apache.org> AuthorDate: Wed Aug 30 18:03:36 2023 +0800 [improvement](move-memtable) wait StreamClose from remote (#23605) * [fix](move-memtable) wait StreamClose from remote --- be/src/runtime/load_stream.cpp | 1 + be/src/vec/sink/vtablet_sink_v2.cpp | 12 +++++------ be/src/vec/sink/vtablet_sink_v2.h | 2 +- be/test/runtime/load_stream_test.cpp | 41 +++++++++++++++++------------------- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 961f73540a..5abf75476c 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -430,6 +430,7 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[] auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablet_ids); _report_result(id, st, &success_tablet_ids, &failed_tablet_ids); + brpc::StreamClose(id); } break; default: LOG(WARNING) << "unexpected stream message " << hdr.opcode(); diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 196e5ff7e2..243507db71 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -125,13 +125,13 @@ int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* con << status; } } - - _sink->_pending_reports.fetch_add(-1); } return 0; } -void StreamSinkHandler::on_closed(brpc::StreamId id) {} +void StreamSinkHandler::on_closed(brpc::StreamId id) { + _sink->_pending_streams.fetch_add(-1); +} VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) @@ -298,6 +298,7 @@ Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool cntl.ErrorText()); } stream_pool.push_back(stream); + _pending_streams.fetch_add(1); } return Status::OK(); } @@ -522,10 +523,10 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { { SCOPED_TIMER(_close_load_timer); - while (_pending_reports.load() > 0) { + while (_pending_streams.load() > 0) { // TODO: use a better wait std::this_thread::sleep_for(std::chrono::milliseconds(1)); - LOG(INFO) << "sinkv2 close_wait, pending reports: " << _pending_reports.load(); + LOG(INFO) << "sinkv2 close_wait, pending streams: " << _pending_streams.load(); } } @@ -587,7 +588,6 @@ Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) { size_t header_len = header.ByteSizeLong(); buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); buf.append(header.SerializeAsString()); - _pending_reports.fetch_add(1); io::StreamSinkFileWriter::send_with_retry(stream, buf); return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 8f4ce8f42f..c2c24a26fb 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -226,7 +226,7 @@ private: size_t _stream_index = 0; std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet; - std::atomic<int> _pending_reports {0}; + std::atomic<int> _pending_streams {0}; std::unordered_map<int64_t, std::vector<int64_t>> _tablet_success_map; std::unordered_map<int64_t, std::vector<int64_t>> _tablet_failure_map; diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index e3d3868547..00f53b5c5d 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -679,9 +679,8 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) { EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); - EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -708,12 +707,11 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) { close_load(client, 0); wait_for_ack(3); - EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -743,7 +741,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) { EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -773,12 +771,12 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) { EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } -TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0_zero_bytes) { +TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_bytes) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); @@ -814,12 +812,12 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0_zero_b EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } -TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0) { +TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); @@ -860,12 +858,12 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0) { auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); EXPECT_EQ(written_data, data + data); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } -TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment_without_eos) { +TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without_eos) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); @@ -901,12 +899,12 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment_without EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } -TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment1) { +TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); @@ -944,7 +942,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment1) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -997,7 +995,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 1); EXPECT_EQ(written_data, data2); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -1058,7 +1056,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 2, 0); EXPECT_EQ(written_data, data2); - client.disconnect(); + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } @@ -1100,13 +1098,14 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { // duplicated close close_load(clients[1], 1); wait_for_ack(2); - EXPECT_EQ(g_response_stat.num, 2); + // stream closed, no response will be sent + EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); close_load(clients[0], 0); - wait_for_ack(3); - EXPECT_EQ(g_response_stat.num, 3); + wait_for_ack(2); + EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); @@ -1130,9 +1129,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); } - for (int i = 0; i < 2; i++) { - clients[i].disconnect(); - } + // server will close stream on CLOSE_LOAD wait_for_close(); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org