github-actions[bot] commented on code in PR #33481: URL: https://github.com/apache/doris/pull/33481#discussion_r1559050597
########## be/src/vec/sink/writer/vgroup_commit_block_writer.cpp: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vgroup_commit_block_writer.h" + +#include <gen_cpp/DataSinks_types.h> + +#include <future> +#include <shared_mutex> + +#include "common/exception.h" +#include "common/status.h" +#include "runtime/exec_env.h" +#include "runtime/group_commit_mgr.h" +#include "runtime/runtime_state.h" +#include "util/debug_points.h" +#include "util/doris_metrics.h" +#include "vec/exprs/vexpr.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris { + +namespace vectorized { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::vectorized { ``` be/src/vec/sink/writer/vgroup_commit_block_writer.cpp:321: ```diff - } // namespace vectorized - } // namespace doris + } // namespace doris ``` ########## be/src/pipeline/exec/group_commit_block_sink_operator.h: ########## @@ -39,12 +40,71 @@ GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } // TODO: need use mem_limit + bool can_write() override { return true; } }; OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() { return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink); } +class GroupCommitBlockSinkOperatorX; +class GroupCommitBlockSinkLocalState final + : public AsyncWriterSink<vectorized::VGroupCommitBlockWriter, + GroupCommitBlockSinkOperatorX> { + ENABLE_FACTORY_CREATOR(GroupCommitBlockSinkLocalState); + +public: + using Base = + AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>; + GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>( + parent, state) {} + +private: + friend class GroupCommitBlockSinkOperatorX; +}; + +class GroupCommitBlockSinkOperatorX final + : public DataSinkOperatorX<GroupCommitBlockSinkLocalState> { +public: + using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>; + GroupCommitBlockSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector<TExpr>& select_exprs) + : Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(select_exprs) {}; + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { Review Comment: warning: method 'prepare' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status prepare(RuntimeState* state) override { ``` ########## be/src/pipeline/exec/group_commit_block_sink_operator.h: ########## @@ -39,12 +40,71 @@ GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } // TODO: need use mem_limit + bool can_write() override { return true; } }; OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() { return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink); } +class GroupCommitBlockSinkOperatorX; +class GroupCommitBlockSinkLocalState final + : public AsyncWriterSink<vectorized::VGroupCommitBlockWriter, + GroupCommitBlockSinkOperatorX> { + ENABLE_FACTORY_CREATOR(GroupCommitBlockSinkLocalState); + +public: + using Base = + AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>; + GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>( + parent, state) {} + +private: + friend class GroupCommitBlockSinkOperatorX; +}; + +class GroupCommitBlockSinkOperatorX final + : public DataSinkOperatorX<GroupCommitBlockSinkLocalState> { +public: + using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>; + GroupCommitBlockSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector<TExpr>& select_exprs) + : Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(select_exprs) {}; + Status init(const TDataSink& thrift_sink) override { Review Comment: warning: method 'init' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status init(const TDataSink& thrift_sink) override { ``` ########## be/src/pipeline/exec/group_commit_block_sink_operator.h: ########## @@ -39,12 +40,71 @@ GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } // TODO: need use mem_limit + bool can_write() override { return true; } }; OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() { return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink); } +class GroupCommitBlockSinkOperatorX; +class GroupCommitBlockSinkLocalState final + : public AsyncWriterSink<vectorized::VGroupCommitBlockWriter, + GroupCommitBlockSinkOperatorX> { + ENABLE_FACTORY_CREATOR(GroupCommitBlockSinkLocalState); + +public: + using Base = + AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>; + GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>( + parent, state) {} + +private: + friend class GroupCommitBlockSinkOperatorX; +}; + +class GroupCommitBlockSinkOperatorX final + : public DataSinkOperatorX<GroupCommitBlockSinkLocalState> { +public: + using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>; + GroupCommitBlockSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector<TExpr>& select_exprs) + : Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(select_exprs) {}; + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { Review Comment: warning: method 'open' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status open(RuntimeState* state) override { ``` ########## be/src/pipeline/exec/group_commit_block_sink_operator.h: ########## @@ -39,12 +40,71 @@ class GroupCommitBlockSinkOperator final GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } // TODO: need use mem_limit + bool can_write() override { return true; } Review Comment: warning: method 'can_write' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool can_write() override { return true; } ``` ########## be/src/vec/sink/writer/vgroup_commit_block_writer.cpp: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vgroup_commit_block_writer.h" + +#include <gen_cpp/DataSinks_types.h> + +#include <future> +#include <shared_mutex> + +#include "common/exception.h" +#include "common/status.h" +#include "runtime/exec_env.h" +#include "runtime/group_commit_mgr.h" +#include "runtime/runtime_state.h" +#include "util/debug_points.h" +#include "util/doris_metrics.h" +#include "vec/exprs/vexpr.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris { + +namespace vectorized { +bvar::Adder<int64_t> g_group_commit_load_rows("doris_group_commit_load_rows"); +bvar::Adder<int64_t> g_group_commit_load_bytes("doris_group_commit_load_bytes"); + +VGroupCommitBlockWriter::VGroupCommitBlockWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs) + : AsyncResultWriter(output_exprs), _filter_bitmap(1024), _t_sink(t_sink) { + DCHECK(t_sink.__isset.olap_table_sink); +} + +VGroupCommitBlockWriter::~VGroupCommitBlockWriter() { + if (_load_block_queue) { + _remove_estimated_wal_bytes(); + _load_block_queue->remove_load_id(_load_id); + _load_block_queue->group_commit_load_count.fetch_add(1); + } +} + +Status VGroupCommitBlockWriter::_init(RuntimeState* state, RuntimeProfile* profile) { + DCHECK(_t_sink.__isset.olap_table_sink); + auto& table_sink = _t_sink.olap_table_sink; + _tuple_desc_id = table_sink.tuple_id; + _schema.reset(new OlapTableSchemaParam()); + RETURN_IF_ERROR(_schema->init(table_sink.schema)); + _db_id = table_sink.db_id; + _table_id = table_sink.table_id; + _base_schema_version = table_sink.base_schema_version; + _group_commit_mode = table_sink.group_commit_mode; + _load_id = table_sink.load_id; + _max_filter_ratio = table_sink.max_filter_ratio; + _vpartition = std::make_unique<doris::VOlapTablePartitionParam>(_schema, table_sink.partition); + RETURN_IF_ERROR(_vpartition->init()); + + _state = state; + + // profile must add to state's object pool + _profile = profile; + _mem_tracker = std::make_shared<MemTracker>("VGroupCommitBlockWriter:" + + std::to_string(state->load_job_id())); + SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + + // get table's tuple descriptor + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); + if (_output_tuple_desc == nullptr) { + LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id; + return Status::InternalError("unknown destination tuple descriptor"); + } + + _block_convertor = std::make_unique<vectorized::OlapTableBlockConvertor>(_output_tuple_desc); + _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), + _state->batch_size()); + return Status::OK(); +} + +Status VGroupCommitBlockWriter::open(RuntimeState* state, RuntimeProfile* profile) { + // Prepare the exprs to run. + RETURN_IF_ERROR(_init(state, profile)); + return Status::OK(); +} + +Status VGroupCommitBlockWriter::write(Block& input_block) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Status status = Status::OK(); + auto rows = input_block.rows(); + auto bytes = input_block.bytes(); + if (UNLIKELY(rows == 0)) { + return status; + } + SCOPED_TIMER(_profile->total_time_counter()); + + // update incrementally so that FE can get the progress. + // the real 'num_rows_load_total' will be set when sink being closed. + _state->update_num_rows_load_total(rows); + _state->update_num_bytes_load_total(bytes); + g_group_commit_load_rows << rows; + g_group_commit_load_bytes << bytes; + + std::shared_ptr<vectorized::Block> block; + bool has_filtered_rows = false; + RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( + _state, &input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows)); + _has_filtered_rows = false; + if (!_vpartition->is_auto_partition()) { + //reuse vars for find_partition + _partitions.assign(rows, nullptr); + _filter_bitmap.Reset(rows); + + for (int index = 0; index < rows; index++) { + _vpartition->find_partition(block.get(), index, _partitions[index]); + } + for (int row_index = 0; row_index < rows; row_index++) { + if (_partitions[row_index] == nullptr) [[unlikely]] { + _filter_bitmap.Set(row_index, true); + LOG(WARNING) << "no partition for this tuple. tuple=" + << block->dump_data(row_index, 1); + } + _has_filtered_rows = true; + } + } + + if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < rows; ++i) { + if (_block_convertor->filter_map()[i]) { + continue; + } + if (_filter_bitmap.Get(i)) { + continue; + } + res_block.add_row(block.get(), i); + } + block->swap(res_block.to_block()); + } + // add block into block queue + return _add_block(_state, block); +} + +Status VGroupCommitBlockWriter::close(Status close_status) { + RETURN_IF_ERROR(close_status); + int64_t total_rows = _state->num_rows_load_total(); + int64_t loaded_rows = _state->num_rows_load_total(); + _state->set_num_rows_load_total(loaded_rows + _state->num_rows_load_unselected() + + _state->num_rows_load_filtered()); + _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - + loaded_rows); + if (!_is_block_appended) { + // if not meet the max_filter_ratio, we should return error status directly + int64_t num_selected_rows = + _state->num_rows_load_total() - _state->num_rows_load_unselected(); + if (num_selected_rows > 0 && + (double)_state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { + return Status::DataQualityError("too many filtered rows"); + } + RETURN_IF_ERROR(_add_blocks(_state, true)); + } + if (_load_block_queue) { + _remove_estimated_wal_bytes(); + _load_block_queue->remove_load_id(_load_id); + } + // wait to wal + auto st = Status::OK(); + if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish || + _group_commit_mode == TGroupCommitMode::SYNC_MODE)) { + std::unique_lock l(_load_block_queue->mutex); + if (!_load_block_queue->process_finish) { + _load_block_queue->internal_group_commit_finish_cv.wait(l); + } + st = _load_block_queue->status; + } + return st; +} + +Status VGroupCommitBlockWriter::_add_block(RuntimeState* state, + std::shared_ptr<vectorized::Block> block) { + if (block->rows() == 0) { + return Status::OK(); + } + // the insert group commit tvf always accept nullable columns, so we should convert + // the non-nullable columns to nullable columns + for (int i = 0; i < block->columns(); ++i) { + if (block->get_by_position(i).type->is_nullable()) { + continue; + } + block->get_by_position(i).column = make_nullable(block->get_by_position(i).column); + block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); + } + // add block to queue + auto cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); + { + vectorized::IColumn::Selector selector; + for (auto i = 0; i < block->rows(); i++) { + selector.emplace_back(i); + } + block->append_to_block_by_selector(cur_mutable_block.get(), selector); + } + std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared(); + output_block->swap(cur_mutable_block->to_block()); + if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() + + state->num_rows_load_filtered() <= + config::group_commit_memory_rows_for_max_filter_ratio) { + _blocks.emplace_back(output_block); + } else { + if (!_is_block_appended) { + RETURN_IF_ERROR(_add_blocks(state, false)); + } + RETURN_IF_ERROR(_load_block_queue->add_block( + state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); + } + return Status::OK(); +} + +Status VGroupCommitBlockWriter::_add_blocks(RuntimeState* state, + bool is_blocks_contain_all_load_data) { + DCHECK(_is_block_appended == false); Review Comment: warning: redundant boolean literal supplied to boolean operator [readability-simplify-boolean-expr] ```suggestion DCHECK(!_is_block_appended); ``` ########## be/src/pipeline/exec/group_commit_block_sink_operator.h: ########## @@ -39,12 +40,71 @@ GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } // TODO: need use mem_limit + bool can_write() override { return true; } }; OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() { return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink); } +class GroupCommitBlockSinkOperatorX; +class GroupCommitBlockSinkLocalState final + : public AsyncWriterSink<vectorized::VGroupCommitBlockWriter, + GroupCommitBlockSinkOperatorX> { + ENABLE_FACTORY_CREATOR(GroupCommitBlockSinkLocalState); + +public: + using Base = + AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>; + GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : AsyncWriterSink<vectorized::VGroupCommitBlockWriter, GroupCommitBlockSinkOperatorX>( + parent, state) {} + +private: + friend class GroupCommitBlockSinkOperatorX; +}; + +class GroupCommitBlockSinkOperatorX final + : public DataSinkOperatorX<GroupCommitBlockSinkLocalState> { +public: + using Base = DataSinkOperatorX<GroupCommitBlockSinkLocalState>; + GroupCommitBlockSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector<TExpr>& select_exprs) + : Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(select_exprs) {}; + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { Review Comment: warning: method 'sink' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org