This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3084c9e9226e48672fed7919802e9884e97ea07f Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sun Aug 11 20:26:29 2024 +0800 [Fix](multi-catalog) Fix not throw error when call close() in hive/iceberg writer. (#38987) ## Proposed changes [Fix] (multi-catalog) Fix not throw error when call close() in hive/iceberg writer. When the file writer closes(), it will sync buffer to commit. Therefore, sometimes data is written only when close() is called, which can expose some errors. For example, hdfs_file_writer. Therefore, this error needs to be captured in the entire close process. --- .../vec/sink/writer/iceberg/viceberg_partition_writer.cpp | 14 ++++++++------ be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp | 10 +++++++--- be/src/vec/sink/writer/vhive_partition_writer.cpp | 14 ++++++++------ be/src/vec/sink/writer/vhive_table_writer.cpp | 10 +++++++--- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp index 9cf7af32204..924adf68145 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp @@ -101,24 +101,26 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil } Status VIcebergPartitionWriter::close(const Status& status) { + Status result_status; if (_file_format_transformer != nullptr) { - Status st = _file_format_transformer->close(); - if (!st.ok()) { + result_status = _file_format_transformer->close(); + if (!result_status.ok()) { LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}", - st.to_string()); + result_status.to_string()); } } - if (!status.ok() && _fs != nullptr) { + bool status_ok = result_status.ok() && status.ok(); + if (!status_ok && _fs != nullptr) { auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); Status st = _fs->delete_file(path); if (!st.ok()) { LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); } } - if (status.ok()) { + if (status_ok) { _state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data()); } - return Status::OK(); + return result_status; } Status VIcebergPartitionWriter::write(vectorized::Block& block) { diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 898b71d1d9a..a116cfb7f39 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -273,15 +273,19 @@ Status VIcebergTableWriter::_filter_block(doris::vectorized::Block& block, } Status VIcebergTableWriter::close(Status status) { + Status result_status; int64_t partitions_to_writers_size = _partitions_to_writers.size(); { SCOPED_RAW_TIMER(&_close_ns); for (const auto& pair : _partitions_to_writers) { Status st = pair.second->close(status); - if (st != Status::OK()) { + if (!st.ok()) { LOG(WARNING) << fmt::format("partition writer close failed for partition {}", st.to_string()); - continue; + if (result_status.ok()) { + result_status = st; + continue; + } } } _partitions_to_writers.clear(); @@ -297,7 +301,7 @@ Status VIcebergTableWriter::close(Status status) { COUNTER_SET(_close_timer, _close_ns); COUNTER_SET(_write_file_counter, _write_file_count); } - return Status::OK(); + return result_status; } std::string VIcebergTableWriter::_partition_to_path(const doris::iceberg::StructLike& data) { diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 5322bde57f1..0d6767b6196 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -117,24 +117,26 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) } Status VHivePartitionWriter::close(const Status& status) { + Status result_status; if (_file_format_transformer != nullptr) { - Status st = _file_format_transformer->close(); - if (!st.ok()) { + result_status = _file_format_transformer->close(); + if (!result_status.ok()) { LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}", - st.to_string()); + result_status.to_string()); } } - if (!status.ok() && _fs != nullptr) { + bool status_ok = result_status.ok() && status.ok(); + if (!status_ok && _fs != nullptr) { auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); Status st = _fs->delete_file(path); if (!st.ok()) { LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); } } - if (status.ok()) { + if (status_ok) { _state->hive_partition_updates().emplace_back(_build_partition_update()); } - return Status::OK(); + return result_status; } Status VHivePartitionWriter::write(vectorized::Block& block) { diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 53f70b6b31a..091560ff8ce 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -242,15 +242,19 @@ Status VHiveTableWriter::_filter_block(doris::vectorized::Block& block, } Status VHiveTableWriter::close(Status status) { + Status result_status; int64_t partitions_to_writers_size = _partitions_to_writers.size(); { SCOPED_RAW_TIMER(&_close_ns); for (const auto& pair : _partitions_to_writers) { Status st = pair.second->close(status); - if (st != Status::OK()) { + if (!st.ok()) { LOG(WARNING) << fmt::format("partition writer close failed for partition {}", st.to_string()); - continue; + if (result_status.ok()) { + result_status = st; + continue; + } } } _partitions_to_writers.clear(); @@ -266,7 +270,7 @@ Status VHiveTableWriter::close(Status status) { COUNTER_SET(_close_timer, _close_ns); COUNTER_SET(_write_file_counter, _write_file_count); } - return Status::OK(); + return result_status; } std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org