This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit aff0907778ae9a5e0b627f48327d93d50921c56f Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Jun 7 10:09:32 2023 +0800 [improvement](sink) reuse rows buffer in msyql_result_writer (#20482) Creating a rows buffer for each block can impact non-negligible performance. So it is necessary to reuse the rows buffer. Test with a total of 1.7M rows, the AppendBatchTime reduced from 500ms to 280ms. --- be/src/util/mysql_row_buffer.cpp | 20 +++++++++++ be/src/util/mysql_row_buffer.h | 2 ++ be/src/vec/sink/vmysql_result_writer.cpp | 61 +++++++++++++++++++------------- be/src/vec/sink/vmysql_result_writer.h | 3 ++ 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/be/src/util/mysql_row_buffer.cpp b/be/src/util/mysql_row_buffer.cpp index f408cb31a1..f346c2d43c 100644 --- a/be/src/util/mysql_row_buffer.cpp +++ b/be/src/util/mysql_row_buffer.cpp @@ -80,6 +80,25 @@ MysqlRowBuffer<is_binary_format>::MysqlRowBuffer() _dynamic_mode(0), _len_pos(0) {} +template <bool is_binary_format> +MysqlRowBuffer<is_binary_format>::MysqlRowBuffer(MysqlRowBuffer<is_binary_format>&& other) { + if (other._buf == other._default_buf) { + auto other_length = other.length(); + memcpy(_default_buf, other._buf, other_length); + _buf = _default_buf; + _pos = _buf + other_length; + } else { + _buf = other._buf; + other._buf = other._default_buf; + _pos = other._pos; + } + _buf_size = other._buf_size; + _dynamic_mode = other._dynamic_mode; + _field_count = other._field_count; + _field_pos = other._field_pos; + _len_pos = other._len_pos; +} + template <bool is_binary_format> void MysqlRowBuffer<is_binary_format>::start_binary_row(uint32_t num_cols) { assert(is_binary_format); @@ -94,6 +113,7 @@ template <bool is_binary_format> MysqlRowBuffer<is_binary_format>::~MysqlRowBuffer() { if (_buf != _default_buf) { delete[] _buf; + _buf = _default_buf; } } diff --git a/be/src/util/mysql_row_buffer.h b/be/src/util/mysql_row_buffer.h index 2df739450f..d0e91e766d 100644 --- a/be/src/util/mysql_row_buffer.h +++ b/be/src/util/mysql_row_buffer.h @@ -56,6 +56,8 @@ public: MysqlRowBuffer(); ~MysqlRowBuffer(); + MysqlRowBuffer(MysqlRowBuffer&& other); + void reset() { _pos = _buf; } // Prepare for binary row buffer diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 6e3e34bcc6..019c0556c4 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -98,6 +98,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() { _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); + _copy_buffer_timer = ADD_CHILD_TIMER(_parent_profile, "CopyBufferTime", "AppendBatchTime"); _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); } @@ -605,43 +606,53 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) { Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - auto num_rows = block.rows(); - std::vector<MysqlRowBuffer<is_binary_format>> rows_buffer; - rows_buffer.resize(num_rows); - if constexpr (is_binary_format) { - for (MysqlRowBuffer<is_binary_format>& buf : rows_buffer) { - buf.start_binary_row(_output_vexpr_ctxs.size()); - } - } // convert one batch auto result = std::make_unique<TFetchDataResult>(); - for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) { - const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column); - auto type_ptr = block.get_by_position(i).type; + auto num_rows = block.rows(); - DCHECK(num_rows == block.get_by_position(i).column->size()) - << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i, - block.get_by_position(i).column->size()); + { + SCOPED_TIMER(_convert_tuple_timer); + if (_rows_buffer.size() < num_rows) { + _rows_buffer.resize(num_rows); + } - RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql( - *column_ptr, output_object_data(), rows_buffer, 0, 0, num_rows, col_const)); + for (size_t i = 0; i != num_rows; ++i) { + _rows_buffer[i].reset(); + if constexpr (is_binary_format) { + _rows_buffer[i].start_binary_row(_output_vexpr_ctxs.size()); + } + } - if (!status) { - LOG(WARNING) << "convert row to mysql result failed. block_struct=" - << block.dump_structure(); - break; + for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) { + const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column); + auto type_ptr = block.get_by_position(i).type; + + DCHECK(num_rows == block.get_by_position(i).column->size()) + << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i, + block.get_by_position(i).column->size()); + + RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql( + *column_ptr, output_object_data(), _rows_buffer, 0, 0, num_rows, col_const)); + + if (!status) { + LOG(WARNING) << "convert row to mysql result failed. block_struct=" + << block.dump_structure(); + break; + } } } uint64_t bytes_sent = 0; // copy MysqlRowBuffer to Thrift - result->result_batch.rows.resize(num_rows); - for (int i = 0; i < num_rows; ++i) { - result->result_batch.rows[i].append(rows_buffer[i].buf(), rows_buffer[i].length()); - bytes_sent += rows_buffer[i].length(); + { + SCOPED_TIMER(_copy_buffer_timer); + result->result_batch.rows.resize(num_rows); + for (int i = 0; i < num_rows; ++i) { + result->result_batch.rows[i].append(_rows_buffer[i].buf(), _rows_buffer[i].length()); + bytes_sent += _rows_buffer[i].length(); + } } - if (status) { SCOPED_TIMER(_result_send_timer); // If this is a dry run task, no need to send data block diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index 0e0b4d9313..3b6b8579d7 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -69,6 +69,7 @@ private: BufferControlBlock* _sinker; const VExprContextSPtrs& _output_vexpr_ctxs; + std::vector<MysqlRowBuffer<is_binary_format>> _rows_buffer; RuntimeProfile* _parent_profile; // parent profile from result sink. not owned // total time cost on append batch operation @@ -77,6 +78,8 @@ private: RuntimeProfile::Counter* _convert_tuple_timer = nullptr; // file write timer, child timer of _append_row_batch_timer RuntimeProfile::Counter* _result_send_timer = nullptr; + // timer of copying buffer to thrift + RuntimeProfile::Counter* _copy_buffer_timer = nullptr; // number of sent rows RuntimeProfile::Counter* _sent_rows_counter = nullptr; // size of sent data --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org