This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 71a92070a15 [fix](move-memtable) check segment num when closing each 
tablet (#36753)
71a92070a15 is described below

commit 71a92070a1532ef4ce82644c5fb47cc824ca464b
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Mon Jul 8 17:58:28 2024 +0800

    [fix](move-memtable) check segment num when closing each tablet (#36753)
    
    ## Proposed changes
    
    Previously, there is chance that sender failed to send some data while
    the receiver being unaware of.
    This will cause lost data if some segments are skipped.
    
    This PR fixes the problem by including checks in both sender and
    receiver.
    When sender failed to send rpc, LoadStreamStub will mark the involved
    tablets failed.
    Each sender will send segment num for each tablet in CLOSE_LOAD,
    and receivers (LoadStream) will sum up and check total segment nums.
---
 be/src/io/fs/stream_sink_file_writer.cpp           | 53 ++++++++++----------
 be/src/olap/delta_writer_v2.cpp                    |  3 +-
 be/src/olap/delta_writer_v2.h                      |  2 +-
 be/src/olap/rowset/beta_rowset_writer_v2.h         |  2 +
 be/src/runtime/load_stream.cpp                     | 13 ++++-
 be/src/runtime/load_stream.h                       |  2 +
 be/src/vec/sink/delta_writer_v2_pool.cpp           |  9 ++--
 be/src/vec/sink/delta_writer_v2_pool.h             |  3 +-
 be/src/vec/sink/load_stream_map_pool.cpp           | 18 ++++++-
 be/src/vec/sink/load_stream_map_pool.h             |  7 ++-
 be/src/vec/sink/load_stream_stub.cpp               | 58 +++++++++++++++++++++-
 be/src/vec/sink/load_stream_stub.h                 |  2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 10 +++-
 be/test/vec/exec/delta_writer_v2_pool_test.cpp     | 10 ++--
 gensrc/proto/internal_service.proto                |  1 +
 .../test_multi_replica_fault_injection.groovy      |  5 +-
 16 files changed, 151 insertions(+), 47 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 1d7f823af10..cc55adc1cfb 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -52,42 +52,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
                << ", data_length: " << bytes_req << "file_type" << _file_type;
 
     std::span<const Slice> slices {data, data_cnt};
-    size_t stream_index = 0;
+    size_t fault_injection_skipped_streams = 0;
     bool ok = false;
-    bool skip_stream = false;
     Status st;
     for (auto& stream : _streams) {
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
-            if (stream_index >= 2) {
-                skip_stream = true;
+            if (fault_injection_skipped_streams < 1) {
+                fault_injection_skipped_streams++;
+                continue;
             }
         });
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
-            if (stream_index >= 1) {
-                skip_stream = true;
+            if (fault_injection_skipped_streams < 2) {
+                fault_injection_skipped_streams++;
+                continue;
             }
         });
-        if (!skip_stream) {
-            st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
-                                     _bytes_appended, slices, false, 
_file_type);
-        }
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
-            if (stream_index >= 2) {
-                st = Status::InternalError("stream sink file writer append 
data failed");
-            }
-            stream_index++;
-            skip_stream = false;
-        });
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
-            if (stream_index >= 1) {
-                st = Status::InternalError("stream sink file writer append 
data failed");
-            }
-            stream_index++;
-            skip_stream = false;
-        });
-        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 {
-            st = Status::InternalError("stream sink file writer append data 
failed");
-        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+                        { continue; });
+        st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, _bytes_appended,
+                                 slices, false, _file_type);
         ok = ok || st.ok();
         if (!st.ok()) {
             LOG(WARNING) << "failed to send segment data to backend " << 
stream->dst_id()
@@ -139,8 +123,23 @@ Status StreamSinkFileWriter::_finalize() {
     VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
+    size_t fault_injection_skipped_streams = 0;
     bool ok = false;
     for (auto& stream : _streams) {
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
+            if (fault_injection_skipped_streams < 1) {
+                fault_injection_skipped_streams++;
+                continue;
+            }
+        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
+            if (fault_injection_skipped_streams < 2) {
+                fault_injection_skipped_streams++;
+                continue;
+            }
+        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+                        { continue; });
         auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
                                       _bytes_appended, {}, true, _file_type);
         ok = ok || st.ok();
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 805f072f6e6..4b9f4231bf1 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -181,7 +181,7 @@ Status DeltaWriterV2::close() {
     return _memtable_writer->close();
 }
 
-Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
+Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* 
profile) {
     SCOPED_RAW_TIMER(&_close_wait_time);
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
@@ -191,6 +191,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
         _update_profile(profile);
     }
     RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+    num_segments = _rowset_writer->next_segment_id();
 
     _delta_written_success = true;
     return Status::OK();
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 9aded8fb556..beeb3d3ecd3 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -78,7 +78,7 @@ public:
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait(RuntimeProfile* profile = nullptr);
+    Status close_wait(int32_t& num_segments, RuntimeProfile* profile = 
nullptr);
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h 
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 89bd3045089..d2267a3dbd1 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -130,6 +130,8 @@ public:
 
     int32_t allocate_segment_id() override { return 
_segment_creator.allocate_segment_id(); };
 
+    int32_t next_segment_id() { return _segment_creator.next_segment_id(); };
+
     int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
 
     int64_t segment_writer_ns() override { return _segment_writer_ns; }
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 0a35bf6008e..07d488e578f 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -252,6 +252,11 @@ Status TabletStream::close() {
     if (!_failed_st->ok()) {
         return *_failed_st;
     }
+    if (_next_segid.load() != _num_segments) {
+        return Status::Corruption(
+                "segment num mismatch in tablet {}, expected: {}, actual: {}, 
load_id: {}", _id,
+                _num_segments, _next_segid.load(), print_id(_load_id));
+    }
 
     Status st = Status::OK();
     auto close_func = [this, &mu, &cv, &st]() {
@@ -315,11 +320,17 @@ Status IndexStream::close(const std::vector<PTabletID>& 
tablets_to_commit,
     SCOPED_TIMER(_close_wait_timer);
     // open all need commit tablets
     for (const auto& tablet : tablets_to_commit) {
+        if (_id != tablet.index_id()) {
+            continue;
+        }
         TabletStreamSharedPtr tablet_stream;
         auto it = _tablet_streams_map.find(tablet.tablet_id());
-        if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
+        if (it == _tablet_streams_map.end()) {
             RETURN_IF_ERROR(
                     _init_tablet_stream(tablet_stream, tablet.tablet_id(), 
tablet.partition_id()));
+            tablet_stream->add_num_segments(tablet.num_segments());
+        } else {
+            it->second->add_num_segments(tablet.num_segments());
         }
     }
 
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 9e6e0e36a4b..80e69c784ad 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -52,6 +52,7 @@ public:
 
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
+    void add_num_segments(int64_t num_segments) { _num_segments += 
num_segments; }
     Status close();
     int64_t id() const { return _id; }
 
@@ -63,6 +64,7 @@ private:
     std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
     std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
     std::atomic<uint32_t> _next_segid;
+    int64_t _num_segments = 0;
     bthread::Mutex _lock;
     std::shared_ptr<Status> _failed_st;
     PUniqueId _load_id;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index 87c18194127..bc5233ac307 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -43,7 +43,8 @@ std::shared_ptr<DeltaWriterV2> 
DeltaWriterV2Map::get_or_create(
     return writer;
 }
 
-Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
+Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& 
segments_for_tablet,
+                               RuntimeProfile* profile) {
     int num_use = --_use_cnt;
     if (num_use > 0) {
         LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , 
use_cnt=" << num_use;
@@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
         RETURN_IF_ERROR(writer->close());
     }
     LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
-    for (auto& [_, writer] : _map) {
-        RETURN_IF_ERROR(writer->close_wait(profile));
+    for (auto& [tablet_id, writer] : _map) {
+        int32_t num_segments;
+        RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
+        segments_for_tablet[tablet_id] = num_segments;
     }
     return Status::OK();
 }
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
index 912b9216e9f..7e58eea3149 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -70,7 +70,8 @@ public:
             int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> 
creator);
 
     // close all delta writers in this DeltaWriterV2Map if there is no other 
users
-    Status close(RuntimeProfile* profile = nullptr);
+    Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
+                 RuntimeProfile* profile = nullptr);
 
     // cancel all delta writers in this DeltaWriterV2Map
     void cancel(Status status);
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index 7a3072ade6e..2fcb8deaeb2 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
                                            const std::vector<PTabletID>& 
tablets_to_commit) {
     std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
     auto& tablets = _tablets_to_commit[dst_id];
-    tablets.insert(tablets.end(), tablets_to_commit.begin(), 
tablets_to_commit.end());
+    for (const auto& tablet : tablets_to_commit) {
+        tablets.emplace(tablet.tablet_id(), tablet);
+    }
 }
 
 bool LoadStreamMap::release() {
@@ -103,12 +105,24 @@ bool LoadStreamMap::release() {
 
 Status LoadStreamMap::close_load(bool incremental) {
     return for_each_st([this, incremental](int64_t dst_id, const Streams& 
streams) -> Status {
+        std::vector<PTabletID> tablets_to_commit;
         const auto& tablets = _tablets_to_commit[dst_id];
+        tablets_to_commit.reserve(tablets.size());
+        for (const auto& [tablet_id, tablet] : tablets) {
+            tablets_to_commit.push_back(tablet);
+            
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
+        }
+        bool first = true;
         for (auto& stream : streams) {
             if (stream->is_incremental() != incremental) {
                 continue;
             }
-            RETURN_IF_ERROR(stream->close_load(tablets));
+            if (first) {
+                RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+                first = false;
+            } else {
+                RETURN_IF_ERROR(stream->close_load({}));
+            }
         }
         return Status::OK();
     });
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index 5d08b3d1aff..8728686ce9b 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -89,6 +89,10 @@ public:
 
     void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& 
tablets_to_commit);
 
+    void save_segments_for_tablet(const std::unordered_map<int64_t, int32_t>& 
segments_for_tablet) {
+        _segments_for_tablet.insert(segments_for_tablet.cbegin(), 
segments_for_tablet.cend());
+    }
+
     // Return true if the last instance is just released.
     bool release();
 
@@ -108,7 +112,8 @@ private:
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
 
     std::mutex _tablets_to_commit_mutex;
-    std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
+    std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_to_commit;
+    std::unordered_map<int64_t, int32_t> _segments_for_tablet;
 };
 
 class LoadStreamMapPool {
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 63f91678989..f322d67ceaf 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -207,6 +207,11 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
 Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, uint64_t offset, 
std::span<const Slice> data,
                                    bool segment_eos, FileType file_type) {
+    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+        if (segment_id != 0) {
+            return Status::OK();
+        }
+    });
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -225,6 +230,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
                                    int64_t segment_id, const 
SegmentStatistics& segment_stat,
                                    TabletSchemaSPtr flush_schema) {
+    DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+        if (segment_id != 0) {
+            return Status::OK();
+        }
+    });
     PStreamHeader header;
     header.set_src_id(_src_id);
     *header.mutable_load_id() = _load_id;
@@ -369,7 +379,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& 
buf, bool sync) {
     std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
     buffer_lock.unlock();
     VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
-    return _send_with_retry(output);
+    auto st = _send_with_retry(output);
+    if (!st.ok()) {
+        _handle_failure(output, st);
+    }
+    return st;
+}
+
+void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
+    while (buf.size() > 0) {
+        // step 1: parse header
+        size_t hdr_len = 0;
+        buf.cutn((void*)&hdr_len, sizeof(size_t));
+        butil::IOBuf hdr_buf;
+        PStreamHeader hdr;
+        buf.cutn(&hdr_buf, hdr_len);
+        butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
+        hdr.ParseFromZeroCopyStream(&wrapper);
+
+        // step 2: cut data
+        size_t data_len = 0;
+        buf.cutn((void*)&data_len, sizeof(size_t));
+        butil::IOBuf data_buf;
+        buf.cutn(&data_buf, data_len);
+
+        // step 3: handle failure
+        switch (hdr.opcode()) {
+        case PStreamHeader::ADD_SEGMENT:
+        case PStreamHeader::APPEND_DATA: {
+            add_failed_tablet(hdr.tablet_id(), st);
+        } break;
+        case PStreamHeader::CLOSE_LOAD: {
+            brpc::StreamClose(_stream_id);
+        } break;
+        case PStreamHeader::GET_SCHEMA: {
+            // Just log and let wait_for_schema timeout
+            std::ostringstream oss;
+            for (const auto& tablet : hdr.tablets()) {
+                oss << " " << tablet.tablet_id();
+            }
+            LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << 
oss.str() << ", "
+                         << *this;
+        } break;
+        default:
+            LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", 
" << *this;
+            DCHECK(false);
+        }
+    }
 }
 
 Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index dd15eb7bf4c..b6436a4b81a 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -206,7 +206,6 @@ public:
         _success_tablets.push_back(tablet_id);
     }
 
-    // for tests only
     void add_failed_tablet(int64_t tablet_id, Status reason) {
         std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
         _failed_tablets[tablet_id] = reason;
@@ -216,6 +215,7 @@ private:
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
     Status _send_with_retry(butil::IOBuf& buf);
+    void _handle_failure(butil::IOBuf& buf, Status st);
 
     Status _check_cancel() {
         if (!_is_cancelled.load()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index fbefd7a6d83..b293151ef94 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -546,13 +546,18 @@ Status VTabletWriterV2::close(Status exec_status) {
 
         // close DeltaWriters
         {
+            std::unordered_map<int64_t, int32_t> segments_for_tablet;
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            auto st = _delta_writer_for_tablet->close(_profile);
+            auto st = _delta_writer_for_tablet->close(segments_for_tablet, 
_profile);
             _delta_writer_for_tablet.reset();
             if (!st.ok()) {
                 RETURN_IF_ERROR(_cancel(st));
             }
+            // only the last sink closing delta writers will have segment num
+            if (!segments_for_tablet.empty()) {
+                
_load_stream_map->save_segments_for_tablet(segments_for_tablet);
+            }
         }
 
         _calc_tablets_to_commit();
@@ -662,7 +667,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
                 if (VLOG_DEBUG_IS_ON) {
                     partition_ids.push_back(tablet.partition_id());
                 }
-                tablets_to_commit.push_back(tablet);
+                PTabletID t(tablet);
+                tablets_to_commit.push_back(t);
             }
         }
         if (VLOG_DEBUG_IS_ON) {
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index a67a701c409..dc86ce8c3a2 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(map, map3);
     EXPECT_NE(map, map2);
-    EXPECT_TRUE(map->close().ok());
-    EXPECT_TRUE(map2->close().ok());
-    EXPECT_TRUE(map3->close().ok());
+    std::unordered_map<int64_t, int32_t> sft;
+    EXPECT_TRUE(map->close(sft).ok());
+    EXPECT_TRUE(map2->close(sft).ok());
+    EXPECT_TRUE(map3->close(sft).ok());
     EXPECT_EQ(0, pool.size());
 }
 
@@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);
     EXPECT_NE(writer, writer2);
-    static_cast<void>(map->close());
+    std::unordered_map<int64_t, int32_t> sft;
+    static_cast<void>(map->close(sft));
     EXPECT_EQ(0, pool.size());
 }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 4457c50917b..8857b65278b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -72,6 +72,7 @@ message PTabletID {
     optional int64 partition_id = 1;
     optional int64 index_id = 2;
     optional int64 tablet_id = 3;
+    optional int64 num_segments = 4;
 }
 
 message PTabletInfo {
diff --git 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 37d8b4f2610..8080b52ff48 100644
--- 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection", 
"nonConcurrent") {
         // success
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "sucess")
         // StreamSinkFileWriter appendv write segment failed two replica
-        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "replica num 1 < load required replica num 2")
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "add segment failed")
         // StreamSinkFileWriter appendv write segment failed all replica
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "failed to send segment data to any replicas")
-
+        // test segment num check when LoadStreamStub missed tail segments
+        load_with_injection("LoadStreamStub.only_send_segment_0", "segment num 
mismatch")
         sql """ set enable_memtable_on_sink_node=false """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to