This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9a277a6f11d [fix](move-memtable) don't abort in replica write layer unless all replica fails (#29257) 9a277a6f11d is described below commit 9a277a6f11daeb612012babd95fbc0aa73ee5720 Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Dec 29 00:03:28 2023 +0800 [fix](move-memtable) don't abort in replica write layer unless all replica fails (#29257) --- be/src/io/fs/stream_sink_file_writer.cpp | 33 ++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index 25a4f5b27d5..2fce7c6baa9 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -51,9 +51,22 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { << ", data_length: " << bytes_req; std::span<const Slice> slices {data, data_cnt}; + bool ok = false; for (auto& stream : _streams) { - RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, slices)); + auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, + _bytes_appended, slices); + ok = ok || st.ok(); + } + if (!ok) { + std::stringstream ss; + for (auto& stream : _streams) { + ss << " " << stream->dst_id(); + } + LOG(WARNING) << "failed to write any replicas, load_id: " << print_id(_load_id) + << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id + << ", segment_id: " << _segment_id << ", data_length: " << bytes_req + << ", backends:" << ss.str(); + return Status::InternalError("failed to write any replicas"); } _bytes_appended += bytes_req; return Status::OK(); @@ -63,9 +76,21 @@ Status StreamSinkFileWriter::finalize() { VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat + bool ok = false; for (auto& stream : _streams) { - RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, {}, true)); + auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, + _bytes_appended, {}, true); + ok = ok || st.ok(); + } + if (!ok) { + std::stringstream ss; + for (auto& stream : _streams) { + ss << " " << stream->dst_id(); + } + LOG(WARNING) << "failed to finalize any replicas, load_id: " << print_id(_load_id) + << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id + << ", segment_id: " << _segment_id << ", backends:" << ss.str(); + return Status::InternalError("failed to finalize any replicas"); } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org