This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3084c9e9226e48672fed7919802e9884e97ea07f
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Sun Aug 11 20:26:29 2024 +0800

    [Fix](multi-catalog) Fix not throw error when call close() in hive/iceberg 
writer. (#38987)
    
    ## Proposed changes
    
    [Fix] (multi-catalog) Fix not throw error when call close() in
    hive/iceberg writer.
    
    When the file writer closes(), it will sync buffer to commit. Therefore,
    sometimes data is written only when close() is called, which can expose
    some errors. For example, hdfs_file_writer. Therefore, this error needs
    to be captured in the entire close process.
---
 .../vec/sink/writer/iceberg/viceberg_partition_writer.cpp  | 14 ++++++++------
 be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp   | 10 +++++++---
 be/src/vec/sink/writer/vhive_partition_writer.cpp          | 14 ++++++++------
 be/src/vec/sink/writer/vhive_table_writer.cpp              | 10 +++++++---
 4 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 9cf7af32204..924adf68145 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -101,24 +101,26 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
 }
 
 Status VIcebergPartitionWriter::close(const Status& status) {
+    Status result_status;
     if (_file_format_transformer != nullptr) {
-        Status st = _file_format_transformer->close();
-        if (!st.ok()) {
+        result_status = _file_format_transformer->close();
+        if (!result_status.ok()) {
             LOG(WARNING) << fmt::format("_file_format_transformer close 
failed, reason: {}",
-                                        st.to_string());
+                                        result_status.to_string());
         }
     }
-    if (!status.ok() && _fs != nullptr) {
+    bool status_ok = result_status.ok() && status.ok();
+    if (!status_ok && _fs != nullptr) {
         auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
         Status st = _fs->delete_file(path);
         if (!st.ok()) {
             LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
         }
     }
-    if (status.ok()) {
+    if (status_ok) {
         
_state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data());
     }
-    return Status::OK();
+    return result_status;
 }
 
 Status VIcebergPartitionWriter::write(vectorized::Block& block) {
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 898b71d1d9a..a116cfb7f39 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -273,15 +273,19 @@ Status 
VIcebergTableWriter::_filter_block(doris::vectorized::Block& block,
 }
 
 Status VIcebergTableWriter::close(Status status) {
+    Status result_status;
     int64_t partitions_to_writers_size = _partitions_to_writers.size();
     {
         SCOPED_RAW_TIMER(&_close_ns);
         for (const auto& pair : _partitions_to_writers) {
             Status st = pair.second->close(status);
-            if (st != Status::OK()) {
+            if (!st.ok()) {
                 LOG(WARNING) << fmt::format("partition writer close failed for 
partition {}",
                                             st.to_string());
-                continue;
+                if (result_status.ok()) {
+                    result_status = st;
+                    continue;
+                }
             }
         }
         _partitions_to_writers.clear();
@@ -297,7 +301,7 @@ Status VIcebergTableWriter::close(Status status) {
         COUNTER_SET(_close_timer, _close_ns);
         COUNTER_SET(_write_file_counter, _write_file_count);
     }
-    return Status::OK();
+    return result_status;
 }
 
 std::string VIcebergTableWriter::_partition_to_path(const 
doris::iceberg::StructLike& data) {
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 5322bde57f1..0d6767b6196 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -117,24 +117,26 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
 }
 
 Status VHivePartitionWriter::close(const Status& status) {
+    Status result_status;
     if (_file_format_transformer != nullptr) {
-        Status st = _file_format_transformer->close();
-        if (!st.ok()) {
+        result_status = _file_format_transformer->close();
+        if (!result_status.ok()) {
             LOG(WARNING) << fmt::format("_file_format_transformer close 
failed, reason: {}",
-                                        st.to_string());
+                                        result_status.to_string());
         }
     }
-    if (!status.ok() && _fs != nullptr) {
+    bool status_ok = result_status.ok() && status.ok();
+    if (!status_ok && _fs != nullptr) {
         auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
         Status st = _fs->delete_file(path);
         if (!st.ok()) {
             LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
         }
     }
-    if (status.ok()) {
+    if (status_ok) {
         
_state->hive_partition_updates().emplace_back(_build_partition_update());
     }
-    return Status::OK();
+    return result_status;
 }
 
 Status VHivePartitionWriter::write(vectorized::Block& block) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 53f70b6b31a..091560ff8ce 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -242,15 +242,19 @@ Status 
VHiveTableWriter::_filter_block(doris::vectorized::Block& block,
 }
 
 Status VHiveTableWriter::close(Status status) {
+    Status result_status;
     int64_t partitions_to_writers_size = _partitions_to_writers.size();
     {
         SCOPED_RAW_TIMER(&_close_ns);
         for (const auto& pair : _partitions_to_writers) {
             Status st = pair.second->close(status);
-            if (st != Status::OK()) {
+            if (!st.ok()) {
                 LOG(WARNING) << fmt::format("partition writer close failed for 
partition {}",
                                             st.to_string());
-                continue;
+                if (result_status.ok()) {
+                    result_status = st;
+                    continue;
+                }
             }
         }
         _partitions_to_writers.clear();
@@ -266,7 +270,7 @@ Status VHiveTableWriter::close(Status status) {
         COUNTER_SET(_close_timer, _close_ns);
         COUNTER_SET(_write_file_counter, _write_file_count);
     }
-    return Status::OK();
+    return result_status;
 }
 
 std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to