This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 08eaff322c6728d8c328ae0bc95ab2a1a2709607 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Fri Aug 23 14:05:49 2024 +0800 [bugfix](iceberg)Fixed random core with writing iceberg partitioned table (#39808) Use local variables instead of member variables. --- .../sink/writer/iceberg/viceberg_table_writer.cpp | 51 ++++++++++------------ .../sink/writer/iceberg/viceberg_table_writer.h | 8 ++-- 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index a0b9aa6d3f5..280cf8b8107 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -127,7 +127,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) auto writer_iter = _partitions_to_writers.find(""); if (writer_iter == _partitions_to_writers.end()) { try { - writer = _create_partition_writer(output_block, -1); + writer = _create_partition_writer(nullptr, -1); } catch (doris::Exception& e) { return e.to_status(); } @@ -143,7 +143,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) } _partitions_to_writers.erase(writer_iter); try { - writer = _create_partition_writer(output_block, -1, &file_name, + writer = _create_partition_writer(nullptr, -1, &file_name, file_name_index + 1); } catch (doris::Exception& e) { return e.to_status(); @@ -162,22 +162,21 @@ Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) } { + Block transformed_block; SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); - _transformed_block.clear(); - _transformed_block.reserve(_iceberg_partition_columns.size()); + transformed_block.reserve(_iceberg_partition_columns.size()); for (auto& iceberg_partition_columns : _iceberg_partition_columns) { - _transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply( + transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply( output_block, iceberg_partition_columns.source_idx())); } for (int i = 0; i < output_block.rows(); ++i) { std::optional<PartitionData> partition_data; try { - partition_data = _get_partition_data(_transformed_block, i); + partition_data = _get_partition_data(&transformed_block, i); } catch (doris::Exception& e) { return e.to_status(); } std::string partition_name; - DCHECK(partition_data.has_value()); try { partition_name = _partition_to_path(partition_data.value()); } catch (doris::Exception& e) { @@ -188,7 +187,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) const std::string* file_name, int file_name_index, std::shared_ptr<VIcebergPartitionWriter>& writer_ptr) -> Status { try { - auto writer = _create_partition_writer(output_block, position, file_name, + auto writer = _create_partition_writer(&transformed_block, position, file_name, file_name_index); RETURN_IF_ERROR(writer->open(_state, _profile)); IColumn::Filter filter(output_block.rows(), 0); @@ -344,30 +343,27 @@ std::vector<std::string> VIcebergTableWriter::_partition_values( } std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_writer( - vectorized::Block& block, int position, const std::string* file_name, int file_name_index) { + vectorized::Block* transformed_block, int position, const std::string* file_name, + int file_name_index) { auto& iceberg_table_sink = _t_sink.iceberg_table_sink; - std::optional<PartitionData> partition_data; - partition_data = _get_partition_data(_transformed_block, position); - std::string partition_path; std::vector<std::string> partition_values; - if (partition_data.has_value()) { - partition_path = _partition_to_path(partition_data.value()); - partition_values = _partition_values(partition_data.value()); - } const std::string& output_path = iceberg_table_sink.output_path; - std::string write_path; std::string original_write_path; std::string target_path; - if (partition_path.empty()) { - original_write_path = iceberg_table_sink.original_output_path; - target_path = output_path; - write_path = output_path; - } else { + + if (transformed_block != nullptr) { + PartitionData partition_data = _get_partition_data(transformed_block, position); + std::string partition_path = _partition_to_path(partition_data); + partition_values = _partition_values(partition_data); original_write_path = fmt::format("{}/{}", iceberg_table_sink.original_output_path, partition_path); target_path = fmt::format("{}/{}", output_path, partition_path); write_path = fmt::format("{}/{}", output_path, partition_path); + } else { + original_write_path = iceberg_table_sink.original_output_path; + target_path = output_path; + write_path = output_path; } VIcebergPartitionWriter::WriteInfo write_info = { @@ -390,18 +386,15 @@ std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_ iceberg_table_sink.hadoop_config); } -std::optional<PartitionData> VIcebergTableWriter::_get_partition_data( - vectorized::Block& transformed_block, int position) { - if (_iceberg_partition_columns.empty()) { - return std::nullopt; - } - +PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* transformed_block, + int position) { + DCHECK(!_iceberg_partition_columns.empty()); std::vector<std::any> values; values.reserve(_iceberg_partition_columns.size()); int column_idx = 0; for (auto& iceberg_partition_column : _iceberg_partition_columns) { const vectorized::ColumnWithTypeAndName& partition_column = - transformed_block.get_by_position(column_idx); + transformed_block->get_by_position(column_idx); const TypeDescriptor& result_type = iceberg_partition_column.partition_column_transform().get_result_type(); auto value = _get_iceberg_partition_value(result_type, partition_column, position); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index ae53d3af98e..e53c7020a68 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -99,10 +99,10 @@ private: std::vector<std::string> _partition_values(const doris::iceberg::StructLike& data); std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer( - vectorized::Block& block, int position, const std::string* file_name = nullptr, - int file_name_index = 0); + vectorized::Block* transformed_block, int position, + const std::string* file_name = nullptr, int file_name_index = 0); - std::optional<PartitionData> _get_partition_data(vectorized::Block& block, int position); + PartitionData _get_partition_data(vectorized::Block* block, int position); std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc, const ColumnWithTypeAndName& partition_column, @@ -129,8 +129,6 @@ private: VExprContextSPtrs _write_output_vexpr_ctxs; - Block _transformed_block; - size_t _row_count = 0; // profile counters --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org