This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fed632bf4a2 [fix](move-memtable) check segment num when closing each 
tablet (#36753) (#37536)
fed632bf4a2 is described below

commit fed632bf4a2dc5e2e41d9c3c972d626f617e8daf
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Jul 11 20:33:44 2024 +0800

    [fix](move-memtable) check segment num when closing each tablet (#36753) 
(#37536)
    
    cherry-pick #36753 and #37660
---
 be/src/io/fs/stream_sink_file_writer.cpp           |  53 ++++---
 be/src/olap/delta_writer_v2.cpp                    |   3 +-
 be/src/olap/delta_writer_v2.h                      |   2 +-
 be/src/olap/rowset/beta_rowset_writer_v2.h         |   2 +
 be/src/runtime/load_stream.cpp                     |  13 +-
 be/src/runtime/load_stream.h                       |   2 +
 be/src/vec/sink/delta_writer_v2_pool.cpp           |   9 +-
 be/src/vec/sink/delta_writer_v2_pool.h             |   3 +-
 be/src/vec/sink/load_stream_map_pool.cpp           |  18 ++-
 be/src/vec/sink/load_stream_map_pool.h             |   7 +-
 be/src/vec/sink/load_stream_stub.cpp               |  58 +++++++-
 be/src/vec/sink/load_stream_stub.h                 |   2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  10 +-
 be/test/runtime/load_stream_test.cpp               | 152 +++++++++++++++------
 be/test/vec/exec/delta_writer_v2_pool_test.cpp     |  10 +-
 gensrc/proto/internal_service.proto                |   1 +
 .../test_multi_replica_fault_injection.groovy      |   5 +-
 17 files changed, 262 insertions(+), 88 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index cfc924fad0a..e6007550396 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,42 +51,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
                << ", data_length: " << bytes_req;
 
     std::span<const Slice> slices {data, data_cnt};
-    size_t stream_index = 0;
+    size_t fault_injection_skipped_streams = 0;
     bool ok = false;
-    bool skip_stream = false;
     Status st;
     for (auto& stream : _streams) {
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
-            if (stream_index >= 2) {
-                skip_stream = true;
+            if (fault_injection_skipped_streams < 1) {
+                fault_injection_skipped_streams++;
+                continue;
             }
         });
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
-            if (stream_index >= 1) {
-                skip_stream = true;
+            if (fault_injection_skipped_streams < 2) {
+                fault_injection_skipped_streams++;
+                continue;
             }
         });
-        if (!skip_stream) {
-            st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
-                                     _bytes_appended, slices);
-        }
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
-            if (stream_index >= 2) {
-                st = Status::InternalError("stream sink file writer append 
data failed");
-            }
-            stream_index++;
-            skip_stream = false;
-        });
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
-            if (stream_index >= 1) {
-                st = Status::InternalError("stream sink file writer append 
data failed");
-            }
-            stream_index++;
-            skip_stream = false;
-        });
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 {
-            st = Status::InternalError("stream sink file writer append data 
failed");
-        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+                        { continue; });
+        st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, _bytes_appended,
+                                 slices);
         ok = ok || st.ok();
         if (!st.ok()) {
             LOG(WARNING) << "failed to send segment data to backend " << 
stream->dst_id()
@@ -116,8 +100,23 @@ Status StreamSinkFileWriter::finalize() {
     VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
+    size_t fault_injection_skipped_streams = 0;
     bool ok = false;
     for (auto& stream : _streams) {
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
+            if (fault_injection_skipped_streams < 1) {
+                fault_injection_skipped_streams++;
+                continue;
+            }
+        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
+            if (fault_injection_skipped_streams < 2) {
+                fault_injection_skipped_streams++;
+                continue;
+            }
+        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+                        { continue; });
         auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
                                       _bytes_appended, {}, true);
         ok = ok || st.ok();
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 378728f025c..e02c8eea70c 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -180,7 +180,7 @@ Status DeltaWriterV2::close() {
     return _memtable_writer->close();
 }
 
-Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
+Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* 
profile) {
     SCOPED_RAW_TIMER(&_close_wait_time);
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
@@ -190,6 +190,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
         _update_profile(profile);
     }
     RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+    num_segments = _rowset_writer->next_segment_id();
 
     _delta_written_success = true;
     return Status::OK();
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 31b364e1038..0ef564be393 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -77,7 +77,7 @@ public:
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait(RuntimeProfile* profile = nullptr);
+    Status close_wait(int32_t& num_segments, RuntimeProfile* profile = 
nullptr);
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h 
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 4b0ab950de4..6d1321bd144 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -131,6 +131,8 @@ public:
 
     int32_t allocate_segment_id() override { return 
_segment_creator.allocate_segment_id(); };
 
+    int32_t next_segment_id() { return _segment_creator.next_segment_id(); };
+
     int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
 
     int64_t segment_writer_ns() override { return _segment_writer_ns; }
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 8de15091ec5..1f8c33995b3 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -244,6 +244,11 @@ Status TabletStream::close() {
     if (!_failed_st->ok()) {
         return *_failed_st;
     }
+    if (_next_segid.load() != _num_segments) {
+        return Status::Corruption(
+                "segment num mismatch in tablet {}, expected: {}, actual: {}, 
load_id: {}", _id,
+                _num_segments, _next_segid.load(), print_id(_load_id));
+    }
 
     Status st = Status::OK();
     auto close_func = [this, &mu, &cv, &st]() {
@@ -307,11 +312,17 @@ Status IndexStream::close(const std::vector<PTabletID>& 
tablets_to_commit,
     SCOPED_TIMER(_close_wait_timer);
     // open all need commit tablets
     for (const auto& tablet : tablets_to_commit) {
+        if (_id != tablet.index_id()) {
+            continue;
+        }
         TabletStreamSharedPtr tablet_stream;
         auto it = _tablet_streams_map.find(tablet.tablet_id());
-        if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
+        if (it == _tablet_streams_map.end()) {
             RETURN_IF_ERROR(
                     _init_tablet_stream(tablet_stream, tablet.tablet_id(), 
tablet.partition_id()));
+            tablet_stream->add_num_segments(tablet.num_segments());
+        } else {
+            it->second->add_num_segments(tablet.num_segments());
         }
     }
 
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index b2635698379..f690882a878 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -52,6 +52,7 @@ public:
 
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
+    void add_num_segments(int64_t num_segments) { _num_segments += 
num_segments; }
     Status close();
     int64_t id() const { return _id; }
 
@@ -63,6 +64,7 @@ private:
     std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
     std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
     std::atomic<uint32_t> _next_segid;
+    int64_t _num_segments = 0;
     bthread::Mutex _lock;
     std::shared_ptr<Status> _failed_st;
     PUniqueId _load_id;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index 87c18194127..bc5233ac307 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -43,7 +43,8 @@ std::shared_ptr<DeltaWriterV2> 
DeltaWriterV2Map::get_or_create(
     return writer;
 }
 
-Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
+Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& 
segments_for_tablet,
+                               RuntimeProfile* profile) {
     int num_use = --_use_cnt;
     if (num_use > 0) {
         LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , 
use_cnt=" << num_use;
@@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
         RETURN_IF_ERROR(writer->close());
     }
     LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
-    for (auto& [_, writer] : _map) {
-        RETURN_IF_ERROR(writer->close_wait(profile));
+    for (auto& [tablet_id, writer] : _map) {
+        int32_t num_segments;
+        RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
+        segments_for_tablet[tablet_id] = num_segments;
     }
     return Status::OK();
 }
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
index 912b9216e9f..7e58eea3149 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -70,7 +70,8 @@ public:
             int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> 
creator);
 
     // close all delta writers in this DeltaWriterV2Map if there is no other 
users
-    Status close(RuntimeProfile* profile = nullptr);
+    Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
+                 RuntimeProfile* profile = nullptr);
 
     // cancel all delta writers in this DeltaWriterV2Map
     void cancel(Status status);
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index 7a3072ade6e..2fcb8deaeb2 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
                                            const std::vector<PTabletID>& 
tablets_to_commit) {
     std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
     auto& tablets = _tablets_to_commit[dst_id];
-    tablets.insert(tablets.end(), tablets_to_commit.begin(), 
tablets_to_commit.end());
+    for (const auto& tablet : tablets_to_commit) {
+        tablets.emplace(tablet.tablet_id(), tablet);
+    }
 }
 
 bool LoadStreamMap::release() {
@@ -103,12 +105,24 @@ bool LoadStreamMap::release() {
 
 Status LoadStreamMap::close_load(bool incremental) {
     return for_each_st([this, incremental](int64_t dst_id, const Streams& 
streams) -> Status {
+        std::vector<PTabletID> tablets_to_commit;
         const auto& tablets = _tablets_to_commit[dst_id];
+        tablets_to_commit.reserve(tablets.size());
+        for (const auto& [tablet_id, tablet] : tablets) {
+            tablets_to_commit.push_back(tablet);
+            
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
+        }
+        bool first = true;
         for (auto& stream : streams) {
             if (stream->is_incremental() != incremental) {
                 continue;
             }
-            RETURN_IF_ERROR(stream->close_load(tablets));
+            if (first) {
+                RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+                first = false;
+            } else {
+                RETURN_IF_ERROR(stream->close_load({}));
+            }
         }
         return Status::OK();
     });
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index d0f72ab7e00..dcddcdaf8d8 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -90,6 +90,10 @@ public:
 
     void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& 
tablets_to_commit);
 
+    void save_segments_for_tablet(const std::unordered_map<int64_t, int32_t>& 
segments_for_tablet) {
+        _segments_for_tablet.insert(segments_for_tablet.cbegin(), 
segments_for_tablet.cend());
+    }
+
     // Return true if the last instance is just released.
     bool release();
 
@@ -109,7 +113,8 @@ private:
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
 
     std::mutex _tablets_to_commit_mutex;
-    std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
+    std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_to_commit;
+    std::unordered_map<int64_t, int32_t> _segments_for_tablet;
 };
 
 class LoadStreamMapPool {
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index caebb381db6..93f3fd87a85 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -207,6 +207,11 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
 Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
                                    bool segment_eos) {
+    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+        if (segment_id != 0) {
+            return Status::OK();
+        }
+    });
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -224,6 +229,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, const 
SegmentStatistics& segment_stat,
                                    TabletSchemaSPtr flush_schema) {
+    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+        if (segment_id != 0) {
+            return Status::OK();
+        }
+    });
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -368,7 +378,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& 
buf, bool sync) {
     std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
     buffer_lock.unlock();
     VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
-    return _send_with_retry(output);
+    auto st = _send_with_retry(output);
+    if (!st.ok()) {
+        _handle_failure(output, st);
+    }
+    return st;
+}
+
+void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
+    while (buf.size() > 0) {
+        // step 1: parse header
+        size_t hdr_len = 0;
+        buf.cutn((void*)&hdr_len, sizeof(size_t));
+        butil::IOBuf hdr_buf;
+        PStreamHeader hdr;
+        buf.cutn(&hdr_buf, hdr_len);
+        butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
+        hdr.ParseFromZeroCopyStream(&wrapper);
+
+        // step 2: cut data
+        size_t data_len = 0;
+        buf.cutn((void*)&data_len, sizeof(size_t));
+        butil::IOBuf data_buf;
+        buf.cutn(&data_buf, data_len);
+
+        // step 3: handle failure
+        switch (hdr.opcode()) {
+        case PStreamHeader::ADD_SEGMENT:
+        case PStreamHeader::APPEND_DATA: {
+            add_failed_tablet(hdr.tablet_id(), st);
+        } break;
+        case PStreamHeader::CLOSE_LOAD: {
+            brpc::StreamClose(_stream_id);
+        } break;
+        case PStreamHeader::GET_SCHEMA: {
+            // Just log and let wait_for_schema timeout
+            std::ostringstream oss;
+            for (const auto& tablet : hdr.tablets()) {
+                oss << " " << tablet.tablet_id();
+            }
+            LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << 
oss.str() << ", "
+                         << *this;
+        } break;
+        default:
+            LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", 
" << *this;
+            DCHECK(false);
+        }
+    }
 }
 
 Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 1bf0fac4e38..4e6aad8d1ae 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -207,7 +207,6 @@ public:
         _success_tablets.push_back(tablet_id);
     }
 
-    // for tests only
     void add_failed_tablet(int64_t tablet_id, Status reason) {
         std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
         _failed_tablets[tablet_id] = reason;
@@ -217,6 +216,7 @@ private:
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
     Status _send_with_retry(butil::IOBuf& buf);
+    void _handle_failure(butil::IOBuf& buf, Status st);
 
     Status _check_cancel() {
         if (!_is_cancelled.load()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index a6abe14f4f1..fecbd324c57 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -545,13 +545,18 @@ Status VTabletWriterV2::close(Status exec_status) {
 
         // close DeltaWriters
         {
+            std::unordered_map<int64_t, int32_t> segments_for_tablet;
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            auto st = _delta_writer_for_tablet->close(_profile);
+            auto st = _delta_writer_for_tablet->close(segments_for_tablet, 
_profile);
             _delta_writer_for_tablet.reset();
             if (!st.ok()) {
                 RETURN_IF_ERROR(_cancel(st));
             }
+            // only the last sink closing delta writers will have segment num
+            if (!segments_for_tablet.empty()) {
+                
_load_stream_map->save_segments_for_tablet(segments_for_tablet);
+            }
         }
 
         _calc_tablets_to_commit();
@@ -661,7 +666,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
                 if (VLOG_DEBUG_IS_ON) {
                     partition_ids.push_back(tablet.partition_id());
                 }
-                tablets_to_commit.push_back(tablet);
+                PTabletID t(tablet);
+                tablets_to_commit.push_back(t);
             }
         }
         if (VLOG_DEBUG_IS_ON) {
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index 2ee3f86c1a4..f0cc3354d8e 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -494,19 +494,17 @@ public:
             : _heavy_work_pool(4, 32, "load_stream_test_heavy"),
               _light_work_pool(4, 32, "load_stream_test_light") {}
 
-    void close_load(MockSinkClient& client, uint32_t sender_id = 
NORMAL_SENDER_ID) {
+    void close_load(MockSinkClient& client, const std::vector<PTabletID>& 
tablets_to_commit = {},
+                    uint32_t sender_id = NORMAL_SENDER_ID) {
         butil::IOBuf append_buf;
         PStreamHeader header;
         header.mutable_load_id()->set_hi(1);
         header.mutable_load_id()->set_lo(1);
         header.set_opcode(PStreamHeader::CLOSE_LOAD);
         header.set_src_id(sender_id);
-        /* TODO: fix test with tablets_to_commit 
-        PTabletID* tablets_to_commit = header.add_tablets();
-        tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID);
-        tablets_to_commit->set_index_id(NORMAL_INDEX_ID);
-        tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID);
-        */
+        for (const auto& tablet : tablets_to_commit) {
+            *header.add_tablets() = tablet;
+        }
         size_t hdr_len = header.ByteSizeLong();
         append_buf.append((char*)&hdr_len, sizeof(size_t));
         append_buf.append(header.SerializeAsString());
@@ -677,14 +675,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) {
     write_normal(client);
 
     reset_response_stat();
-    close_load(client, ABNORMAL_SENDER_ID);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
+    close_load(client, {tablet}, ABNORMAL_SENDER_ID);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
 
-    close_load(client);
+    close_load(client, {tablet});
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -735,14 +738,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
-    close_load(client, 1);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(ABNORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -766,17 +774,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
 
-    close_load(client, 1);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    close_load(client, 0);
+    // on the final close_load, segment num check will fail
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
-    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);
 
     // server will close stream on CLOSE_LOAD
     wait_for_close();
@@ -796,13 +810,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID);
 
-    close_load(client, 1);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(ABNORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -829,21 +848,26 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment0_zero_b
                      0, data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -870,8 +894,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
                      data.length(), data, false);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
     // CLOSE_LOAD before EOS
-    close_load(client);
+    close_load(client, {tablet});
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -885,7 +914,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
                      data.length(), data, true);
 
     // duplicated close, will not be handled
-    close_load(client);
+    close_load(client, {tablet});
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -912,21 +941,26 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment0) {
                      data.length(), data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -956,21 +990,26 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment_without
                      0, data, false);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -999,21 +1038,26 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_single_segment1) {
                      data.length(), data, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(1);
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -1046,21 +1090,26 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_one_tablet_two_segment) {
                      0, data2, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(2);
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -1098,21 +1147,32 @@ TEST_F(LoadStreamMgrTest, 
one_client_one_index_three_tablet) {
                      NORMAL_TABLET_ID + 2, 0, 0, data2, true);
 
     EXPECT_EQ(g_response_stat.num, 0);
+
+    PTabletID tablet1;
+    tablet1.set_partition_id(NORMAL_PARTITION_ID);
+    tablet1.set_index_id(NORMAL_INDEX_ID);
+    tablet1.set_tablet_id(NORMAL_TABLET_ID);
+    tablet1.set_num_segments(1);
+    PTabletID tablet2 {tablet1};
+    tablet2.set_tablet_id(NORMAL_TABLET_ID + 1);
+    PTabletID tablet3 {tablet1};
+    tablet3.set_tablet_id(NORMAL_TABLET_ID + 2);
+
     // CLOSE_LOAD
-    close_load(client, 1);
+    close_load(client, {tablet1, tablet2, tablet3}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(client, 1);
+    close_load(client, {tablet1, tablet2, tablet3}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(client, 0);
+    close_load(client, {tablet1, tablet2, tablet3}, 0);
     wait_for_ack(3);
     EXPECT_EQ(g_response_stat.num, 3);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3);
@@ -1166,22 +1226,27 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
     }
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(3);
     // CLOSE_LOAD
-    close_load(clients[1], 1);
+    close_load(clients[1], {tablet}, 1);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
     // duplicated close
-    close_load(clients[1], 1);
+    close_load(clients[1], {tablet}, 1);
     wait_for_ack(2);
     // stream closed, no response will be sent
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
     EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
 
-    close_load(clients[0], 0);
+    close_load(clients[0], {tablet}, 0);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -1236,8 +1301,13 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_close_before_the_other_open) {
     }
 
     EXPECT_EQ(g_response_stat.num, 0);
+    PTabletID tablet;
+    tablet.set_partition_id(NORMAL_PARTITION_ID);
+    tablet.set_index_id(NORMAL_INDEX_ID);
+    tablet.set_tablet_id(NORMAL_TABLET_ID);
+    tablet.set_num_segments(3);
     // CLOSE_LOAD
-    close_load(clients[0], 0);
+    close_load(clients[0], {tablet}, 0);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -1254,7 +1324,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_close_before_the_other_open) {
                          NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + 
segid], true);
     }
 
-    close_load(clients[1], 1);
+    close_load(clients[1], {tablet}, 1);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index a67a701c409..dc86ce8c3a2 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(map, map3);
     EXPECT_NE(map, map2);
-    EXPECT_TRUE(map->close().ok());
-    EXPECT_TRUE(map2->close().ok());
-    EXPECT_TRUE(map3->close().ok());
+    std::unordered_map<int64_t, int32_t> sft;
+    EXPECT_TRUE(map->close(sft).ok());
+    EXPECT_TRUE(map2->close(sft).ok());
+    EXPECT_TRUE(map3->close(sft).ok());
     EXPECT_EQ(0, pool.size());
 }
 
@@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);
     EXPECT_NE(writer, writer2);
-    static_cast<void>(map->close());
+    std::unordered_map<int64_t, int32_t> sft;
+    static_cast<void>(map->close(sft));
     EXPECT_EQ(0, pool.size());
 }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 0a975b81991..14a165a3b9d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -66,6 +66,7 @@ message PTabletID {
     optional int64 partition_id = 1;
     optional int64 index_id = 2;
     optional int64 tablet_id = 3;
+    optional int64 num_segments = 4;
 }
 
 message PTabletInfo {
diff --git 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 37d8b4f2610..8080b52ff48 100644
--- 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection", 
"nonConcurrent") {
         // success
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "sucess")
         // StreamSinkFileWriter appendv write segment failed two replica
-        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "replica num 1 < load required replica num 2")
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "add segment failed")
         // StreamSinkFileWriter appendv write segment failed all replica
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "failed to send segment data to any replicas")
-
+        // test segment num check when LoadStreamStub missed tail segments
+        load_with_injection("LoadStreamStub.only_send_segment_0", "segment num 
mismatch")
         sql """ set enable_memtable_on_sink_node=false """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to