This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a277a6f11d [fix](move-memtable) don't abort in replica write layer 
unless all replica fails (#29257)
9a277a6f11d is described below

commit 9a277a6f11daeb612012babd95fbc0aa73ee5720
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Dec 29 00:03:28 2023 +0800

    [fix](move-memtable) don't abort in replica write layer unless all replica 
fails (#29257)
---
 be/src/io/fs/stream_sink_file_writer.cpp | 33 ++++++++++++++++++++++++++++----
 1 file changed, 29 insertions(+), 4 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 25a4f5b27d5..2fce7c6baa9 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,9 +51,22 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
                << ", data_length: " << bytes_req;
 
     std::span<const Slice> slices {data, data_cnt};
+    bool ok = false;
     for (auto& stream : _streams) {
-        RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, 
_tablet_id, _segment_id,
-                                            _bytes_appended, slices));
+        auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
+                                      _bytes_appended, slices);
+        ok = ok || st.ok();
+    }
+    if (!ok) {
+        std::stringstream ss;
+        for (auto& stream : _streams) {
+            ss << " " << stream->dst_id();
+        }
+        LOG(WARNING) << "failed to write any replicas, load_id: " << 
print_id(_load_id)
+                     << ", index_id: " << _index_id << ", tablet_id: " << 
_tablet_id
+                     << ", segment_id: " << _segment_id << ", data_length: " 
<< bytes_req
+                     << ", backends:" << ss.str();
+        return Status::InternalError("failed to write any replicas");
     }
     _bytes_appended += bytes_req;
     return Status::OK();
@@ -63,9 +76,21 @@ Status StreamSinkFileWriter::finalize() {
     VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
                << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
+    bool ok = false;
     for (auto& stream : _streams) {
-        RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, 
_tablet_id, _segment_id,
-                                            _bytes_appended, {}, true));
+        auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
+                                      _bytes_appended, {}, true);
+        ok = ok || st.ok();
+    }
+    if (!ok) {
+        std::stringstream ss;
+        for (auto& stream : _streams) {
+            ss << " " << stream->dst_id();
+        }
+        LOG(WARNING) << "failed to finalize any replicas, load_id: " << 
print_id(_load_id)
+                     << ", index_id: " << _index_id << ", tablet_id: " << 
_tablet_id
+                     << ", segment_id: " << _segment_id << ", backends:" << 
ss.str();
+        return Status::InternalError("failed to finalize any replicas");
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to