This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 08eaff322c6728d8c328ae0bc95ab2a1a2709607
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Fri Aug 23 14:05:49 2024 +0800

    [bugfix](iceberg)Fixed random core with writing iceberg partitioned table 
(#39808)
    
    Use local variables instead of member variables.
---
 .../sink/writer/iceberg/viceberg_table_writer.cpp  | 51 ++++++++++------------
 .../sink/writer/iceberg/viceberg_table_writer.h    |  8 ++--
 2 files changed, 25 insertions(+), 34 deletions(-)

diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index a0b9aa6d3f5..280cf8b8107 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -127,7 +127,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
             auto writer_iter = _partitions_to_writers.find("");
             if (writer_iter == _partitions_to_writers.end()) {
                 try {
-                    writer = _create_partition_writer(output_block, -1);
+                    writer = _create_partition_writer(nullptr, -1);
                 } catch (doris::Exception& e) {
                     return e.to_status();
                 }
@@ -143,7 +143,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                     }
                     _partitions_to_writers.erase(writer_iter);
                     try {
-                        writer = _create_partition_writer(output_block, -1, 
&file_name,
+                        writer = _create_partition_writer(nullptr, -1, 
&file_name,
                                                           file_name_index + 1);
                     } catch (doris::Exception& e) {
                         return e.to_status();
@@ -162,22 +162,21 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
     }
 
     {
+        Block transformed_block;
         SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
-        _transformed_block.clear();
-        _transformed_block.reserve(_iceberg_partition_columns.size());
+        transformed_block.reserve(_iceberg_partition_columns.size());
         for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-            
_transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
+            
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
                     output_block, iceberg_partition_columns.source_idx()));
         }
         for (int i = 0; i < output_block.rows(); ++i) {
             std::optional<PartitionData> partition_data;
             try {
-                partition_data = _get_partition_data(_transformed_block, i);
+                partition_data = _get_partition_data(&transformed_block, i);
             } catch (doris::Exception& e) {
                 return e.to_status();
             }
             std::string partition_name;
-            DCHECK(partition_data.has_value());
             try {
                 partition_name = _partition_to_path(partition_data.value());
             } catch (doris::Exception& e) {
@@ -188,7 +187,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                         const std::string* file_name, int file_name_index,
                         std::shared_ptr<VIcebergPartitionWriter>& writer_ptr) 
-> Status {
                 try {
-                    auto writer = _create_partition_writer(output_block, 
position, file_name,
+                    auto writer = _create_partition_writer(&transformed_block, 
position, file_name,
                                                            file_name_index);
                     RETURN_IF_ERROR(writer->open(_state, _profile));
                     IColumn::Filter filter(output_block.rows(), 0);
@@ -344,30 +343,27 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
 }
 
 std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_writer(
-        vectorized::Block& block, int position, const std::string* file_name, 
int file_name_index) {
+        vectorized::Block* transformed_block, int position, const std::string* 
file_name,
+        int file_name_index) {
     auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
-    std::optional<PartitionData> partition_data;
-    partition_data = _get_partition_data(_transformed_block, position);
-    std::string partition_path;
     std::vector<std::string> partition_values;
-    if (partition_data.has_value()) {
-        partition_path = _partition_to_path(partition_data.value());
-        partition_values = _partition_values(partition_data.value());
-    }
     const std::string& output_path = iceberg_table_sink.output_path;
-
     std::string write_path;
     std::string original_write_path;
     std::string target_path;
-    if (partition_path.empty()) {
-        original_write_path = iceberg_table_sink.original_output_path;
-        target_path = output_path;
-        write_path = output_path;
-    } else {
+
+    if (transformed_block != nullptr) {
+        PartitionData partition_data = _get_partition_data(transformed_block, 
position);
+        std::string partition_path = _partition_to_path(partition_data);
+        partition_values = _partition_values(partition_data);
         original_write_path =
                 fmt::format("{}/{}", iceberg_table_sink.original_output_path, 
partition_path);
         target_path = fmt::format("{}/{}", output_path, partition_path);
         write_path = fmt::format("{}/{}", output_path, partition_path);
+    } else {
+        original_write_path = iceberg_table_sink.original_output_path;
+        target_path = output_path;
+        write_path = output_path;
     }
 
     VIcebergPartitionWriter::WriteInfo write_info = {
@@ -390,18 +386,15 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
             iceberg_table_sink.hadoop_config);
 }
 
-std::optional<PartitionData> VIcebergTableWriter::_get_partition_data(
-        vectorized::Block& transformed_block, int position) {
-    if (_iceberg_partition_columns.empty()) {
-        return std::nullopt;
-    }
-
+PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* 
transformed_block,
+                                                       int position) {
+    DCHECK(!_iceberg_partition_columns.empty());
     std::vector<std::any> values;
     values.reserve(_iceberg_partition_columns.size());
     int column_idx = 0;
     for (auto& iceberg_partition_column : _iceberg_partition_columns) {
         const vectorized::ColumnWithTypeAndName& partition_column =
-                transformed_block.get_by_position(column_idx);
+                transformed_block->get_by_position(column_idx);
         const TypeDescriptor& result_type =
                 
iceberg_partition_column.partition_column_transform().get_result_type();
         auto value = _get_iceberg_partition_value(result_type, 
partition_column, position);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index ae53d3af98e..e53c7020a68 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -99,10 +99,10 @@ private:
     std::vector<std::string> _partition_values(const 
doris::iceberg::StructLike& data);
 
     std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
-            vectorized::Block& block, int position, const std::string* 
file_name = nullptr,
-            int file_name_index = 0);
+            vectorized::Block* transformed_block, int position,
+            const std::string* file_name = nullptr, int file_name_index = 0);
 
-    std::optional<PartitionData> _get_partition_data(vectorized::Block& block, 
int position);
+    PartitionData _get_partition_data(vectorized::Block* block, int position);
 
     std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc,
                                           const ColumnWithTypeAndName& 
partition_column,
@@ -129,8 +129,6 @@ private:
 
     VExprContextSPtrs _write_output_vexpr_ctxs;
 
-    Block _transformed_block;
-
     size_t _row_count = 0;
 
     // profile counters


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to