This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f0463a9034389eb2f20de121ca70bc271f65b34a Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sun Apr 7 15:12:41 2024 +0800 [Feature][Enhancement](hive-writer) Add hive-writer runtime profiles, change output file names (#33245) Issue Number: #31442 - Add hive-writer runtime profiles. - Change output file names to `${query_id}${uuid}-${index}.${compression}.${format}`. e.g. `"d8735c6fa444a6d-acd392981e510c2b_34fbdcbb-b2e1-4f2c-b68c-a384238954a9-0.snappy.parquet"`. For the same partition writer, when the file size exceeds `hive_sink_max_file_size`, the currently written file will be closed and a new file will be generated, in which ${index} in the new file name will be incremented, while the rest will be the same . --- be/src/common/config.h | 2 +- be/src/vec/sink/writer/vhive_partition_writer.cpp | 55 ++++- be/src/vec/sink/writer/vhive_partition_writer.h | 11 +- be/src/vec/sink/writer/vhive_table_writer.cpp | 245 ++++++++++++---------- be/src/vec/sink/writer/vhive_table_writer.h | 27 ++- 5 files changed, 212 insertions(+), 128 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 6076ce79433..ae39a6e5eb4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1268,7 +1268,7 @@ DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_thresh DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); /** Hive sink configurations **/ -DECLARE_mInt64(hive_sink_max_file_size); // 1GB +DECLARE_mInt64(hive_sink_max_file_size); // Number of open tries, default 1 means only try to open once. // Retry the Open num_retries time waiting 100 milliseconds between retries. diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 7f9a0dd1b1e..f8fd13c5a27 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -32,8 +32,8 @@ VHivePartitionWriter::VHivePartitionWriter( const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs, const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns, - WriteInfo write_info, std::string file_name, TFileFormatType::type file_format_type, - TFileCompressType::type hive_compress_type, + WriteInfo write_info, std::string file_name, int file_name_index, + TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, const std::map<std::string, std::string>& hadoop_conf) : _partition_name(std::move(partition_name)), _update_mode(update_mode), @@ -43,6 +43,7 @@ VHivePartitionWriter::VHivePartitionWriter( _columns(columns), _write_info(std::move(write_info)), _file_name(std::move(file_name)), + _file_name_index(file_name_index), _file_format_type(file_format_type), _hive_compress_type(hive_compress_type), _hadoop_conf(hadoop_conf) @@ -55,12 +56,14 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) std::vector<TNetworkAddress> broker_addresses; RETURN_IF_ERROR(FileFactory::create_file_writer( _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, - fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer)); + fmt::format("{}/{}", _write_info.write_path, _file_name, _file_name_index, + _get_file_extension(_file_format_type, _hive_compress_type)), + 0, _file_writer)); std::vector<std::string> column_names; column_names.reserve(_columns.size()); for (int i = 0; i < _columns.size(); i++) { - column_names.push_back(_columns[i].name); + column_names.emplace_back(_columns[i].name); } switch (_file_format_type) { @@ -192,11 +195,53 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { location.__set_write_path(_write_info.write_path); location.__set_target_path(_write_info.target_path); hive_partition_update.__set_location(location); - hive_partition_update.__set_file_names({_file_name}); + hive_partition_update.__set_file_names( + {fmt::format("{}-{}{}", _file_name, _file_name_index, + _get_file_extension(_file_format_type, _hive_compress_type))}); hive_partition_update.__set_row_count(_row_count); hive_partition_update.__set_file_size(_input_size_in_bytes); return hive_partition_update; } +std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type write_compress_type) { + std::string compress_name; + switch (write_compress_type) { + case TFileCompressType::SNAPPYBLOCK: { + compress_name = ".snappy"; + break; + } + case TFileCompressType::ZLIB: { + compress_name = ".zlib"; + break; + } + case TFileCompressType::ZSTD: { + compress_name = ".zstd"; + break; + } + default: { + compress_name = ""; + break; + } + } + + std::string file_format_name; + switch (file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + file_format_name = ".parquet"; + break; + } + case TFileFormatType::FORMAT_ORC: { + file_format_name = ".orc"; + break; + } + default: { + file_format_name = ""; + break; + } + } + return fmt::format("{}{}", compress_name, file_format_name); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index 88488a94673..b8735ddcf7d 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -48,7 +48,8 @@ public: const VExprContextSPtrs& write_output_expr_ctxs, const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns, WriteInfo write_info, - const std::string file_name, TFileFormatType::type file_format_type, + std::string file_name, int file_name_index, + TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, const std::map<std::string, std::string>& hadoop_conf); @@ -60,6 +61,10 @@ public: Status close(const Status& status); + inline const std::string& file_name() const { return _file_name; } + + inline int file_name_index() const { return _file_name_index; } + inline size_t written_len() { return _file_format_transformer->written_len(); } private: @@ -69,6 +74,9 @@ private: THivePartitionUpdate _build_partition_update(); + std::string _get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type write_compress_type); + std::string _path; std::string _partition_name; @@ -85,6 +93,7 @@ private: const std::vector<THiveColumn>& _columns; WriteInfo _write_info; std::string _file_name; + int _file_name_index; TFileFormatType::type _file_format_type; TFileCompressType::type _hive_compress_type; const std::map<std::string, std::string>& _hadoop_conf; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index e56090773b5..d43fc34b4e5 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -42,6 +42,19 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; _profile = profile; + // add all counter + _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT); + _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _partition_writers_dispatch_timer = + ADD_CHILD_TIMER(_profile, "PartitionsDispatchTime", "SendDataTime"); + _partition_writers_write_timer = + ADD_CHILD_TIMER(_profile, "PartitionsWriteTime", "SendDataTime"); + _partition_writers_count = ADD_COUNTER(_profile, "PartitionsWriteCount", TUnit::UNIT); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); + _write_file_counter = ADD_COUNTER(_profile, "WriteFileCount", TUnit::UNIT); + + SCOPED_TIMER(_open_timer); for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { switch (_t_sink.hive_table_sink.columns[i].column_type) { case THiveColumnType::PARTITION_KEY: { @@ -68,94 +81,120 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { } Status VHiveTableWriter::write(vectorized::Block& block) { + SCOPED_RAW_TIMER(&_send_data_ns); std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; - + _row_count += block.rows(); auto& hive_table_sink = _t_sink.hive_table_sink; if (_partition_columns_input_index.empty()) { - auto writer_iter = _partitions_to_writers.find(""); - if (writer_iter == _partitions_to_writers.end()) { - try { - std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1); - _partitions_to_writers.insert({"", writer}); - RETURN_IF_ERROR(writer->open(_state, _profile)); - RETURN_IF_ERROR(writer->write(block)); - } catch (doris::Exception& e) { - return e.to_status(); - } - return Status::OK(); - } else { - std::shared_ptr<VHivePartitionWriter> writer; - if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { - static_cast<void>(writer_iter->second->close(Status::OK())); - _partitions_to_writers.erase(writer_iter); + std::shared_ptr<VHivePartitionWriter> writer; + { + SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); + auto writer_iter = _partitions_to_writers.find(""); + if (writer_iter == _partitions_to_writers.end()) { try { writer = _create_partition_writer(block, -1); - _partitions_to_writers.insert({"", writer}); - RETURN_IF_ERROR(writer->open(_state, _profile)); } catch (doris::Exception& e) { return e.to_status(); } + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); } else { - writer = writer_iter->second; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + std::string file_name(writer_iter->second->file_name()); + int file_name_index = writer_iter->second->file_name_index(); + { + SCOPED_RAW_TIMER(&_close_ns); + static_cast<void>(writer_iter->second->close(Status::OK())); + } + _partitions_to_writers.erase(writer_iter); + try { + writer = _create_partition_writer(block, -1, &file_name, + file_name_index + 1); + } catch (doris::Exception& e) { + return e.to_status(); + } + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + } else { + writer = writer_iter->second; + } } - RETURN_IF_ERROR(writer->write(block)); - return Status::OK(); } + SCOPED_RAW_TIMER(&_partition_writers_write_ns); + RETURN_IF_ERROR(writer->write(block)); + return Status::OK(); } - for (int i = 0; i < block.rows(); ++i) { - std::vector<std::string> partition_values; - try { - partition_values = _create_partition_values(block, i); - } catch (doris::Exception& e) { - return e.to_status(); - } - std::string partition_name = VHiveUtils::make_partition_name( - hive_table_sink.columns, _partition_columns_input_index, partition_values); - - auto create_and_open_writer = - [&](const std::string& partition_name, int position, - std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + { + SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); + for (int i = 0; i < block.rows(); ++i) { + std::vector<std::string> partition_values; try { - auto writer = _create_partition_writer(block, position); - RETURN_IF_ERROR(writer->open(_state, _profile)); - IColumn::Filter filter(block.rows(), 0); - filter[position] = 1; - writer_positions.insert({writer, std::move(filter)}); - _partitions_to_writers.insert({partition_name, writer}); - writer_ptr = writer; + partition_values = _create_partition_values(block, i); } catch (doris::Exception& e) { return e.to_status(); } - return Status::OK(); - }; + std::string partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); - auto writer_iter = _partitions_to_writers.find(partition_name); - if (writer_iter == _partitions_to_writers.end()) { - std::shared_ptr<VHivePartitionWriter> writer; - RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); - } else { - std::shared_ptr<VHivePartitionWriter> writer; - if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { - static_cast<void>(writer_iter->second->close(Status::OK())); - writer_positions.erase(writer_iter->second); - _partitions_to_writers.erase(writer_iter); - RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); - } else { - writer = writer_iter->second; - } - auto writer_pos_iter = writer_positions.find(writer); - if (writer_pos_iter == writer_positions.end()) { - IColumn::Filter filter(block.rows(), 0); - filter[i] = 1; - writer_positions.insert({writer, std::move(filter)}); + auto create_and_open_writer = + [&](const std::string& partition_name, int position, + const std::string* file_name, int file_name_index, + std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + try { + auto writer = + _create_partition_writer(block, position, file_name, file_name_index); + RETURN_IF_ERROR(writer->open(_state, _profile)); + IColumn::Filter filter(block.rows(), 0); + filter[position] = 1; + writer_positions.insert({writer, std::move(filter)}); + _partitions_to_writers.insert({partition_name, writer}); + writer_ptr = writer; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_iter = _partitions_to_writers.find(partition_name); + if (writer_iter == _partitions_to_writers.end()) { + std::shared_ptr<VHivePartitionWriter> writer; + if (_partitions_to_writers.size() + 1 > + config::table_sink_partition_write_max_partition_nums_per_writer) { + return Status::InternalError( + "Too many open partitions {}", + config::table_sink_partition_write_max_partition_nums_per_writer); + } + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer)); } else { - writer_pos_iter->second[i] = 1; + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + std::string file_name(writer_iter->second->file_name()); + int file_name_index = writer_iter->second->file_name_index(); + { + SCOPED_RAW_TIMER(&_close_ns); + static_cast<void>(writer_iter->second->close(Status::OK())); + } + writer_positions.erase(writer_iter->second); + _partitions_to_writers.erase(writer_iter); + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name, + file_name_index + 1, writer)); + } else { + writer = writer_iter->second; + } + auto writer_pos_iter = writer_positions.find(writer); + if (writer_pos_iter == writer_positions.end()) { + IColumn::Filter filter(block.rows(), 0); + filter[i] = 1; + writer_positions.insert({writer, std::move(filter)}); + } else { + writer_pos_iter->second[i] = 1; + } } } } - + SCOPED_RAW_TIMER(&_partition_writers_write_ns); for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { RETURN_IF_ERROR(it->first->write(block, &it->second)); } @@ -163,19 +202,34 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } Status VHiveTableWriter::close(Status status) { - for (const auto& pair : _partitions_to_writers) { - Status st = pair.second->close(status); - if (st != Status::OK()) { - LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); - continue; + int64_t partitions_to_writers_size = _partitions_to_writers.size(); + { + SCOPED_RAW_TIMER(&_close_ns); + for (const auto& pair : _partitions_to_writers) { + Status st = pair.second->close(status); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); + continue; + } } + _partitions_to_writers.clear(); + } + if (status.ok()) { + SCOPED_TIMER(_profile->total_time_counter()); + + COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count)); + COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns); + COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns); + COUNTER_SET(_partition_writers_count, partitions_to_writers_size); + COUNTER_SET(_close_timer, _close_ns); + COUNTER_SET(_write_file_counter, _write_file_count); } - _partitions_to_writers.clear(); return Status::OK(); } std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( - vectorized::Block& block, int position) { + vectorized::Block& block, int position, const std::string* file_name, int file_name_index) { auto& hive_table_sink = _t_sink.hive_table_sink; std::vector<std::string> partition_values; std::string partition_name; @@ -247,13 +301,12 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer } } + _write_file_count++; return std::make_shared<VHivePartitionWriter>( _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, _write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns, - std::move(write_info), - fmt::format("{}{}", _compute_file_name(), - _get_file_extension(file_format_type, write_compress_type)), - file_format_type, write_compress_type, hive_table_sink.hadoop_config); + std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name, + file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config); } std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block, @@ -397,46 +450,6 @@ std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_des } } -std::string VHiveTableWriter::_get_file_extension(TFileFormatType::type file_format_type, - TFileCompressType::type write_compress_type) { - std::string compress_name; - switch (write_compress_type) { - case TFileCompressType::SNAPPYBLOCK: { - compress_name = ".snappy"; - break; - } - case TFileCompressType::ZLIB: { - compress_name = ".zlib"; - break; - } - case TFileCompressType::ZSTD: { - compress_name = ".zstd"; - break; - } - default: { - compress_name = ""; - break; - } - } - - std::string file_format_name; - switch (file_format_type) { - case TFileFormatType::FORMAT_PARQUET: { - file_format_name = ".parquet"; - break; - } - case TFileFormatType::FORMAT_ORC: { - file_format_name = ".orc"; - break; - } - default: { - file_format_name = ""; - break; - } - } - return fmt::format("{}{}", compress_name, file_format_name); -} - std::string VHiveTableWriter::_compute_file_name() { boost::uuids::uuid uuid = boost::uuids::random_generator()(); diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 9f48f6afde1..3a3f45a6db1 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -19,6 +19,7 @@ #include <gen_cpp/DataSinks_types.h> +#include "util/runtime_profile.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/writer/async_result_writer.h" @@ -50,17 +51,15 @@ public: Status close(Status) override; private: - std::shared_ptr<VHivePartitionWriter> _create_partition_writer(vectorized::Block& block, - int position); + std::shared_ptr<VHivePartitionWriter> _create_partition_writer( + vectorized::Block& block, int position, const std::string* file_name = nullptr, + int file_name_index = 0); std::vector<std::string> _create_partition_values(vectorized::Block& block, int position); std::string _to_partition_value(const TypeDescriptor& type_desc, const ColumnWithTypeAndName& partition_column, int position); - std::string _get_file_extension(TFileFormatType::type file_format_type, - TFileCompressType::type write_compress_type); - std::string _compute_file_name(); // Currently it is a copy, maybe it is better to use move semantics to eliminate it. @@ -72,6 +71,24 @@ private: std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> _partitions_to_writers; VExprContextSPtrs _write_output_vexpr_ctxs; + + size_t _row_count = 0; + + // profile counters + int64_t _send_data_ns = 0; + int64_t _partition_writers_dispatch_ns = 0; + int64_t _partition_writers_write_ns = 0; + int64_t _close_ns = 0; + int64_t _write_file_count = 0; + + RuntimeProfile::Counter* _written_rows_counter = nullptr; + RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_dispatch_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_write_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_count = nullptr; + RuntimeProfile::Counter* _open_timer = nullptr; + RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _write_file_counter = nullptr; }; } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org