This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e29d8cb1108 [feature](move-memtable) support pipelineX in sink v2 (#27067) e29d8cb1108 is described below commit e29d8cb110849637fc7beca5b5d76b103b9c95bf Author: Kaijie Chen <c...@apache.org> AuthorDate: Thu Nov 16 15:00:55 2023 +0800 [feature](move-memtable) support pipelineX in sink v2 (#27067) --- be/src/exec/data_sink.cpp | 8 +- ..._operator.h => olap_table_sink_v2_operator.cpp} | 51 ++- be/src/pipeline/exec/olap_table_sink_v2_operator.h | 74 ++- be/src/pipeline/pipeline_x/operator.cpp | 3 + .../pipeline_x/pipeline_x_fragment_context.cpp | 30 +- .../pipeline_x/pipeline_x_fragment_context.h | 3 + be/src/vec/sink/vtablet_sink_v2.cpp | 505 +-------------------- be/src/vec/sink/vtablet_sink_v2.h | 178 +------- .../vtablet_writer_v2.cpp} | 129 +++--- .../vtablet_writer_v2.h} | 37 +- 10 files changed, 228 insertions(+), 790 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index f849dc84a25..970e7a3a18a 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink break; } case TDataSinkType::OLAP_TABLE_SINK: { - Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status)); + sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false)); } - RETURN_IF_ERROR(status); break; } case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: { @@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink break; } case TDataSinkType::OLAP_TABLE_SINK: { - Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status)); + sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false)); } - RETURN_IF_ERROR(status); break; } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp similarity index 53% copy from be/src/pipeline/exec/olap_table_sink_v2_operator.h copy to be/src/pipeline/exec/olap_table_sink_v2_operator.cpp index f280e856f0c..99efc1d752e 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp @@ -15,35 +15,36 @@ // specific language governing permissions and limitations // under the License. -#pragma once +#include "olap_table_sink_v2_operator.h" -#include "operator.h" -#include "vec/sink/vtablet_sink_v2.h" +#include "common/status.h" -namespace doris { - -namespace pipeline { - -class OlapTableSinkV2OperatorBuilder final - : public DataSinkOperatorBuilder<vectorized::VOlapTableSinkV2> { -public: - OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink) - : DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {} - - OperatorPtr build_operator() override; -}; - -class OlapTableSinkV2Operator final : public DataSinkOperator<OlapTableSinkV2OperatorBuilder> { -public: - OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink* sink) - : DataSinkOperator(operator_builder, sink) {} - - bool can_write() override { return true; } // TODO: need use mem_limit -}; +namespace doris::pipeline { OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() { return std::make_shared<OlapTableSinkV2Operator>(this, _sink); } -} // namespace pipeline -} // namespace doris \ No newline at end of file +Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast<Parent>(); + RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit)); + return Status::OK(); +} + +Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status exec_status) { + if (Base::_closed) { + return Status::OK(); + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(exec_time_counter()); + if (_closed) { + return _close_status; + } + _close_status = Base::close(state, exec_status); + return _close_status; +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index f280e856f0c..5fb8f64dd31 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -18,6 +18,7 @@ #pragma once #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/vtablet_sink_v2.h" namespace doris { @@ -41,9 +42,76 @@ public: bool can_write() override { return true; } // TODO: need use mem_limit }; -OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() { - return std::make_shared<OlapTableSinkV2Operator>(this, _sink); -} +class OlapTableSinkV2OperatorX; + +class OlapTableSinkV2LocalState final + : public AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>; + using Parent = OlapTableSinkV2OperatorX; + ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState); + OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class OlapTableSinkV2OperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2LocalState> { +public: + using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>; + OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr, bool group_commit) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _group_commit(group_commit), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, source_state); + } + +private: + friend class OlapTableSinkV2LocalState; + template <typename Writer, typename Parent> + friend class AsyncWriterSink; + const RowDescriptor& _row_desc; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + const std::vector<TExpr>& _t_output_expr; + const bool _group_commit; + ObjectPool* _pool; +}; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index f8430a57159..9e6df06da01 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -46,6 +46,7 @@ #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" +#include "pipeline/exec/olap_table_sink_v2_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState) DECLARE_OPERATOR_X(JdbcTableSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkLocalState) +DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) @@ -624,5 +626,6 @@ template class PipelineXLocalState<LocalExchangeDependency>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>; +template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 200ac23504e..7113989ee1e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -62,6 +62,7 @@ #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" +#include "pipeline/exec/olap_table_sink_v2_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -268,9 +269,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData break; } case TDataSinkType::OLAP_TABLE_SINK: { - if (state->query_options().enable_memtable_on_sink_node) { - return Status::InternalError( - "Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node "); + if (state->query_options().enable_memtable_on_sink_node && + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { + _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc, + output_exprs, false)); } else { _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs, false)); @@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); _runtime_states[i]->resize_op_id_to_local_state(max_operator_id()); + _runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node); + _runtime_states[i]->set_total_load_streams(request.total_load_streams); + _runtime_states[i]->set_num_local_sink(request.num_local_sink); std::map<PipelineId, PipelineXTask*> pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique<PipelineXTask>( @@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) { std::placeholders::_2)}, shared_from_this()); } + +bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { + OlapTableSchemaParam schema; + if (!schema.init(sink.schema).ok()) { + return false; + } + if (schema.is_partial_update()) { + return true; + } + for (const auto& index_schema : schema.indexes()) { + for (const auto& index : index_schema->indexes) { + if (index->index_type() == INVERTED) { + return true; + } + } + } + return false; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 4d2a59277e9..6fa91aedf12 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -145,6 +145,9 @@ private: const TPipelineFragmentParams& params, const RowDescriptor& row_desc, RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); + + bool _has_inverted_index_or_partial_update(TOlapTableSink sink); + OperatorXPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index ac9be0e7fb7..9385bd93202 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -17,47 +17,22 @@ #include "vec/sink/vtablet_sink_v2.h" -#include <brpc/uri.h> -#include <bthread/bthread.h> -#include <fmt/format.h> #include <gen_cpp/DataSinks_types.h> #include <gen_cpp/Descriptors_types.h> -#include <gen_cpp/Metrics_types.h> -#include <gen_cpp/Types_types.h> -#include <gen_cpp/internal_service.pb.h> -#include <algorithm> -#include <execution> -#include <mutex> #include <ranges> -#include <string> #include <unordered_map> #include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" #include "common/object_pool.h" -#include "common/signal_handler.h" #include "common/status.h" -#include "exec/tablet_info.h" #include "olap/delta_writer_v2.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" -#include "service/brpc.h" -#include "util/brpc_client_cache.h" #include "util/doris_metrics.h" -#include "util/network_util.h" -#include "util/threadpool.h" -#include "util/thrift_util.h" -#include "util/uid_util.h" -#include "vec/core/block.h" -#include "vec/exprs/vexpr.h" #include "vec/sink/delta_writer_v2_pool.h" #include "vec/sink/load_stream_stub.h" #include "vec/sink/load_stream_stub_pool.h" -#include "vec/sink/vtablet_block_convertor.h" -#include "vec/sink/vtablet_finder.h" namespace doris { class TExpr; @@ -65,391 +40,16 @@ class TExpr; namespace vectorized { VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, Status* status) - : DataSink(row_desc), _pool(pool) { - // From the thrift expressions create the real exprs. - *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs); - _name = "VOlapTableSinkV2"; -} + const std::vector<TExpr>& texprs, bool group_commit) + : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, texprs), + _pool(pool), + _group_commit(group_commit) {} VOlapTableSinkV2::~VOlapTableSinkV2() = default; -Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { - // add new tablet locations. it will use by address. so add to pool - auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets)); - _location->add_locations(*new_locations); - - // update new node info - _nodes_info->add_nodes(result->nodes); - - // incremental open stream - RETURN_IF_ERROR(_incremental_open_streams(result->partitions)); - - return Status::OK(); -} - -static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { - return static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result); -} - -Status VOlapTableSinkV2::_incremental_open_streams( - const std::vector<TOlapTablePartition>& partitions) { - // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. - std::unordered_set<int64_t> known_indexes; - std::unordered_set<int64_t> new_backends; - for (const auto& t_partition : partitions) { - VOlapTablePartition* partition = nullptr; - RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition)); - for (const auto& index : partition->indexes) { - for (const auto& tablet_id : index.tablets) { - auto nodes = _location->find_tablet(tablet_id)->node_ids; - for (auto& node : nodes) { - PTabletID tablet; - tablet.set_partition_id(partition->id); - tablet.set_index_id(index.index_id); - tablet.set_tablet_id(tablet_id); - if (!_streams_for_node.contains(node)) { - new_backends.insert(node); - } - _tablets_for_node[node].emplace(tablet_id, tablet); - if (known_indexes.contains(index.index_id)) [[likely]] { - continue; - } - _indexes_from_node[node].emplace_back(tablet); - known_indexes.insert(index.index_id); - } - } - } - } - for (int64_t node_id : new_backends) { - auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); - RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); - _streams_for_node[node_id] = load_streams; - } - return Status::OK(); -} - -Status VOlapTableSinkV2::_init_row_distribution() { - VRowDistributionContext ctx; - - ctx.state = _state; - ctx.block_convertor = _block_convertor.get(); - ctx.tablet_finder = _tablet_finder.get(); - ctx.vpartition = _vpartition; - ctx.add_partition_request_timer = _add_partition_request_timer; - ctx.txn_id = _txn_id; - ctx.pool = _pool; - ctx.location = _location; - ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs; - ctx.on_partitions_created = &vectorized::on_partitions_created; - ctx.caller = (void*)this; - ctx.schema = _schema; - - _row_distribution.init(&ctx); - - RETURN_IF_ERROR(_row_distribution.open(_output_row_desc)); - - return Status::OK(); -} - Status VOlapTableSinkV2::init(const TDataSink& t_sink) { - DCHECK(t_sink.__isset.olap_table_sink); - auto& table_sink = t_sink.olap_table_sink; - _load_id.set_hi(table_sink.load_id.hi); - _load_id.set_lo(table_sink.load_id.lo); - _txn_id = table_sink.txn_id; - _num_replicas = table_sink.num_replicas; - _tuple_desc_id = table_sink.tuple_id; - _write_file_cache = table_sink.write_file_cache; - _schema.reset(new OlapTableSchemaParam()); - RETURN_IF_ERROR(_schema->init(table_sink.schema)); - _location = _pool->add(new OlapTableLocationParam(table_sink.location)); - _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); - - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink - auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; - if (table_sink.partition.distributed_columns.empty()) { - if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { - find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; - } else { - find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; - } - } - _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); - _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode); - RETURN_IF_ERROR(_vpartition->init()); - - return Status::OK(); -} - -Status VOlapTableSinkV2::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - - _state = state; - - _sender_id = state->per_fragment_instance_idx(); - _num_senders = state->num_per_fragment_instances(); - _backend_id = state->backend_id(); - _stream_per_node = state->load_stream_per_node(); - _total_streams = state->total_load_streams(); - _num_local_sink = state->num_local_sink(); - DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; - DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; - DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; - LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node - << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; - _is_high_priority = - (state->execution_timeout() <= config::load_task_high_priority_threshold_second); - - // profile must add to state's object pool - _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2")); - _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" + - std::to_string(state->load_job_id())); - SCOPED_TIMER(_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - - // get table's tuple descriptor - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); - if (_output_tuple_desc == nullptr) { - return Status::InternalError("unknown destination tuple descriptor, id = {}", - _tuple_desc_id); - } - _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); - _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); - - // add all counter - _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); - _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); - _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); - _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); - _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); - _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime"); - _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime"); - _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); - _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); - _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); - _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); - - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); - if (config::share_delta_writers) { - _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( - _load_id, _num_local_sink); - } else { - _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id); - } - return Status::OK(); -} - -Status VOlapTableSinkV2::open(RuntimeState* state) { - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); - SCOPED_TIMER(_profile->total_time_counter()); - SCOPED_TIMER(_open_timer); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - signal::set_signal_task_id(_load_id); - - _build_tablet_node_mapping(); - RETURN_IF_ERROR(_open_streams(_backend_id)); - RETURN_IF_ERROR(_init_row_distribution()); - - return Status::OK(); -} - -Status VOlapTableSinkV2::_open_streams(int64_t src_id) { - for (auto& [dst_id, _] : _tablets_for_node) { - auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); - RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); - _streams_for_node[dst_id] = streams; - } - return Status::OK(); -} - -Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id, - ::doris::stream_load::LoadStreams& streams) { - auto node_info = _nodes_info->find_node(dst_id); - if (node_info == nullptr) { - return Status::InternalError("Unknown node {} in tablet location", dst_id); - } - // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams.streams() | std::ranges::views::take(1)) { - const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - _state->enable_profile())); - } - // for the rest streams, open without getting tablet schema - for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, {}, _total_streams, - _state->enable_profile())); - } - return Status::OK(); -} - -void VOlapTableSinkV2::_build_tablet_node_mapping() { - std::unordered_set<int64_t> known_indexes; - for (const auto& partition : _vpartition->get_partitions()) { - for (const auto& index : partition->indexes) { - for (const auto& tablet_id : index.tablets) { - auto nodes = _location->find_tablet(tablet_id)->node_ids; - for (auto& node : nodes) { - PTabletID tablet; - tablet.set_partition_id(partition->id); - tablet.set_index_id(index.index_id); - tablet.set_tablet_id(tablet_id); - _tablets_for_node[node].emplace(tablet_id, tablet); - if (known_indexes.contains(index.index_id)) [[likely]] { - continue; - } - _indexes_from_node[node].emplace_back(tablet); - known_indexes.insert(index.index_id); - } - } - } - } -} - -void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids, - RowsForTablet& rows_for_tablet) { - for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { - auto& row_ids = row_part_tablet_ids[index_idx].row_ids; - auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; - auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; - - for (int i = 0; i < row_ids.size(); i++) { - auto& tablet_id = tablet_ids[i]; - auto it = rows_for_tablet.find(tablet_id); - if (it == rows_for_tablet.end()) { - Rows rows; - rows.partition_id = partition_ids[i]; - rows.index_id = _schema->indexes()[index_idx]->index_id; - rows.row_idxes.reserve(row_ids.size()); - auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); - it = tmp_it; - } - it->second.row_idxes.push_back(row_ids[i]); - _number_output_rows++; - } - } -} - -Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams) { - auto location = _location->find_tablet(tablet_id); - if (location == nullptr) { - return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); - } - for (auto& node_id : location->node_ids) { - PTabletID tablet; - tablet.set_partition_id(partition_id); - tablet.set_index_id(index_id); - tablet.set_tablet_id(tablet_id); - _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); - RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); - } - _stream_index = (_stream_index + 1) % _stream_per_node; - return Status::OK(); -} - -Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Status status = Status::OK(); - - if (state->query_options().dry_run_query) { - return status; - } - - auto input_rows = input_block->rows(); - auto input_bytes = input_block->bytes(); - if (UNLIKELY(input_rows == 0)) { - return status; - } - SCOPED_TIMER(_profile->total_time_counter()); - _number_input_rows += input_rows; - // update incrementally so that FE can get the progress. - // the real 'num_rows_load_total' will be set when sink being closed. - state->update_num_rows_load_total(input_rows); - state->update_num_bytes_load_total(input_bytes); - DorisMetrics::instance()->load_rows->increment(input_rows); - DorisMetrics::instance()->load_bytes->increment(input_bytes); - - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - - SCOPED_RAW_TIMER(&_send_data_ns); - // This is just for passing compilation. - _row_distribution_watch.start(); - - std::shared_ptr<vectorized::Block> block; - RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - *input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); - RowsForTablet rows_for_tablet; - _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); - - _row_distribution_watch.stop(); - - // For each tablet, send its input_rows from block to delta writer - for (const auto& [tablet_id, rows] : rows_for_tablet) { - Streams streams; - RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams)); - RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); - } - - return Status::OK(); -} - -Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block, - int64_t tablet_id, const Rows& rows, - const Streams& streams) { - DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { - WriteRequest req { - .tablet_id = tablet_id, - .txn_id = _txn_id, - .index_id = rows.index_id, - .partition_id = rows.partition_id, - .load_id = _load_id, - .tuple_desc = _output_tuple_desc, - .table_schema_param = _schema.get(), - .is_high_priority = _is_high_priority, - .write_file_cache = _write_file_cache, - }; - for (auto& index : _schema->indexes()) { - if (index->index_id == rows.index_id) { - req.slots = &index->slots; - req.schema_hash = index->schema_hash; - break; - } - } - return DeltaWriterV2::open(&req, streams); - }); - { - SCOPED_TIMER(_wait_mem_limit_timer); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); - } - SCOPED_TIMER(_write_memtable_timer); - auto st = delta_writer->write(block.get(), rows.row_idxes, false); - return st; -} - -Status VOlapTableSinkV2::_cancel(Status status) { - LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", due to error: " << status; - if (_delta_writer_for_tablet) { - _delta_writer_for_tablet->cancel(status); - _delta_writer_for_tablet.reset(); - } - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } + RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); + RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit)); return Status::OK(); } @@ -457,97 +57,8 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { if (_closed) { return _close_status; } - SCOPED_TIMER(_close_timer); - Status status = exec_status; - if (status.ok()) { - // only if status is ok can we call this _profile->total_time_counter(). - // if status is not ok, this sink may not be prepared, so that _profile is null - SCOPED_TIMER(_profile->total_time_counter()); - - COUNTER_SET(_input_rows_counter, _number_input_rows); - COUNTER_SET(_output_rows_counter, _number_output_rows); - COUNTER_SET(_filtered_rows_counter, - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); - COUNTER_SET(_send_data_timer, _send_data_ns); - COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); - COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - - // release streams from the pool first, to prevent memory leak - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } - - { - SCOPED_TIMER(_close_writer_timer); - // close all delta writers if this is the last user - RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile)); - _delta_writer_for_tablet.reset(); - } - - { - // send CLOSE_LOAD to all streams, return ERROR if any - for (const auto& [_, streams] : _streams_for_node) { - RETURN_IF_ERROR(_close_load(streams->streams())); - } - } - - { - SCOPED_TIMER(_close_load_timer); - for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - RETURN_IF_ERROR(stream->close_wait()); - } - } - } - - std::vector<TTabletCommitInfo> tablet_commit_infos; - for (const auto& [node_id, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - for (auto tablet_id : stream->success_tablets()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = node_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); - } - } - } - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); - _streams_for_node.clear(); - - // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + - state->num_rows_load_unselected(); - state->set_num_rows_load_total(num_rows_load_total); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows()); - state->update_num_rows_load_unselected( - _tablet_finder->num_immutable_partition_filtered_rows()); - - LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id; - } else { - RETURN_IF_ERROR(_cancel(status)); - } - - _close_status = status; - RETURN_IF_ERROR(DataSink::close(state, exec_status)); - return status; -} - -Status VOlapTableSinkV2::_close_load(const Streams& streams) { - auto node_id = streams[0]->dst_id(); - std::vector<PTabletID> tablets_to_commit; - for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { - if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { - tablets_to_commit.push_back(tablet); - } - } - for (const auto& stream : streams) { - RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); - } - return Status::OK(); + _close_status = AsyncWriterSink::close(state, exec_status); + return _close_status; } } // namespace vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 1f317420de4..cef4659bddb 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -26,204 +26,46 @@ #include <gen_cpp/types.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> -#include <stddef.h> -#include <stdint.h> #include <atomic> +#include <cstddef> +#include <cstdint> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep -#include <functional> -#include <initializer_list> -#include <map> -#include <memory> -#include <mutex> -#include <ostream> -#include <queue> -#include <set> -#include <string> -#include <unordered_map> -#include <unordered_set> -#include <utility> #include <vector> -#include "common/config.h" #include "common/status.h" -#include "exec/data_sink.h" -#include "exec/tablet_info.h" -#include "gutil/ref_counted.h" #include "runtime/exec_env.h" -#include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" #include "runtime/types.h" -#include "util/countdown_latch.h" -#include "util/runtime_profile.h" -#include "util/stopwatch.hpp" -#include "vec/columns/column.h" -#include "vec/common/allocator.h" -#include "vec/common/hash_table/phmap_fwd_decl.h" -#include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/sink/vrow_distribution.h" +#include "vec/sink/async_writer_sink.h" +#include "vec/sink/writer/vtablet_writer_v2.h" namespace doris { -class DeltaWriterV2; -class LoadStreamStub; -class ObjectPool; -class RowDescriptor; -class RuntimeState; -class TDataSink; -class TExpr; -class TabletSchema; -class TupleDescriptor; - -namespace stream_load { -class LoadStreams; -} namespace vectorized { -class OlapTableBlockConvertor; -class OlapTabletFinder; -class VOlapTableSinkV2; -class DeltaWriterV2Map; - -using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; - -struct Rows { - int64_t partition_id; - int64_t index_id; - std::vector<int32_t> row_idxes; -}; - -using RowsForTablet = std::unordered_map<int64_t, Rows>; +inline constexpr char VOLAP_TABLE_SINK_V2[] = "VOlapTableSinkV2"; -// Write block data to Olap Table. -// When OlapTableSink::open() called, there will be a consumer thread running in the background. -// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches. -// Join the consumer thread in close(). -class VOlapTableSinkV2 final : public DataSink { +class VOlapTableSinkV2 final : public AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2> { public: // Construct from thrift struct which is generated by FE. VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, Status* status); + const std::vector<TExpr>& texprs, bool group_commit); ~VOlapTableSinkV2() override; Status init(const TDataSink& sink) override; - // TODO: unify the code of prepare/open/close with result sink - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; - - Status close(RuntimeState* state, Status close_status) override; - - Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; - - Status on_partitions_created(TCreatePartitionResult* result); + Status close(RuntimeState* state, Status exec_status) override; private: - Status _init_row_distribution(); - - Status _open_streams(int64_t src_id); - - Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams); - - Status _incremental_open_streams(const std::vector<TOlapTablePartition>& partitions); - - void _build_tablet_node_mapping(); - - void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids, - RowsForTablet& rows_for_tablet); - - Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, - const Rows& rows, const Streams& streams); - - Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams); - - Status _close_load(const Streams& streams); - - Status _cancel(Status status); - - std::shared_ptr<MemTracker> _mem_tracker; - ObjectPool* _pool; - // unique load id - PUniqueId _load_id; - int64_t _txn_id = -1; - int _num_replicas = -1; - int _tuple_desc_id = -1; - - // this is tuple descriptor of destination OLAP table - TupleDescriptor* _output_tuple_desc = nullptr; - RowDescriptor* _output_row_desc = nullptr; - - // number of senders used to insert into OlapTable, if we only support single node insert, - // all data from select should collectted and then send to OlapTable. - // To support multiple senders, we maintain a channel for each sender. - int _sender_id = -1; - int _num_senders = -1; - int64_t _backend_id = -1; - int _stream_per_node = -1; - int _total_streams = -1; - int _num_local_sink = -1; - bool _is_high_priority = false; - bool _write_file_cache = false; - - // TODO(zc): think about cache this data - std::shared_ptr<OlapTableSchemaParam> _schema; - OlapTableLocationParam* _location = nullptr; - DorisNodesInfo* _nodes_info = nullptr; - - std::unique_ptr<OlapTabletFinder> _tablet_finder; - - std::unique_ptr<OlapTableBlockConvertor> _block_convertor; - - // Stats for this - int64_t _send_data_ns = 0; - int64_t _number_input_rows = 0; - int64_t _number_output_rows = 0; - - MonotonicStopWatch _row_distribution_watch; - - RuntimeProfile::Counter* _input_rows_counter = nullptr; - RuntimeProfile::Counter* _output_rows_counter = nullptr; - RuntimeProfile::Counter* _filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _send_data_timer = nullptr; - RuntimeProfile::Counter* _row_distribution_timer = nullptr; - RuntimeProfile::Counter* _write_memtable_timer = nullptr; - RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; - RuntimeProfile::Counter* _validate_data_timer = nullptr; - RuntimeProfile::Counter* _open_timer = nullptr; - RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _close_writer_timer = nullptr; - RuntimeProfile::Counter* _close_load_timer = nullptr; - RuntimeProfile::Counter* _add_partition_request_timer = nullptr; - - // Save the status of close() method - Status _close_status; - - VOlapTablePartitionParam* _vpartition = nullptr; - vectorized::VExprContextSPtrs _output_vexpr_ctxs; - - RuntimeState* _state = nullptr; - - std::unordered_set<int64_t> _opened_partitions; - - std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node; - std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node; - - std::unordered_map<int64_t, std::shared_ptr<::doris::stream_load::LoadStreams>> - _streams_for_node; - - size_t _stream_index = 0; - std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet; + bool _group_commit = false; - VRowDistribution _row_distribution; - // reuse to avoid frequent memory allocation and release. - std::vector<RowPartTabletIds> _row_part_tablet_ids; + Status _close_status = Status::OK(); }; } // namespace vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp similarity index 83% copy from be/src/vec/sink/vtablet_sink_v2.cpp copy to be/src/vec/sink/writer/vtablet_writer_v2.cpp index ac9be0e7fb7..7cf553fddab 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -15,18 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include "vec/sink/vtablet_sink_v2.h" +#include "vec/sink/writer/vtablet_writer_v2.h" #include <brpc/uri.h> -#include <bthread/bthread.h> -#include <fmt/format.h> #include <gen_cpp/DataSinks_types.h> #include <gen_cpp/Descriptors_types.h> #include <gen_cpp/Metrics_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> -#include <algorithm> #include <execution> #include <mutex> #include <ranges> @@ -34,7 +31,6 @@ #include <unordered_map> #include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" @@ -47,12 +43,10 @@ #include "service/brpc.h" #include "util/brpc_client_cache.h" #include "util/doris_metrics.h" -#include "util/network_util.h" #include "util/threadpool.h" #include "util/thrift_util.h" #include "util/uid_util.h" #include "vec/core/block.h" -#include "vec/exprs/vexpr.h" #include "vec/sink/delta_writer_v2_pool.h" #include "vec/sink/load_stream_stub.h" #include "vec/sink/load_stream_stub_pool.h" @@ -60,21 +54,17 @@ #include "vec/sink/vtablet_finder.h" namespace doris { -class TExpr; namespace vectorized { -VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, Status* status) - : DataSink(row_desc), _pool(pool) { - // From the thrift expressions create the real exprs. - *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs); - _name = "VOlapTableSinkV2"; +VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) + : AsyncResultWriter(output_exprs), _t_sink(t_sink) { + DCHECK(t_sink.__isset.olap_table_sink); } -VOlapTableSinkV2::~VOlapTableSinkV2() = default; +VTabletWriterV2::~VTabletWriterV2() = default; -Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { +Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) { // add new tablet locations. it will use by address. so add to pool auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets)); _location->add_locations(*new_locations); @@ -89,10 +79,10 @@ Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { } static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { - return static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result); + return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result); } -Status VOlapTableSinkV2::_incremental_open_streams( +Status VTabletWriterV2::_incremental_open_streams( const std::vector<TOlapTablePartition>& partitions) { // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. std::unordered_set<int64_t> known_indexes; @@ -130,7 +120,7 @@ Status VOlapTableSinkV2::_incremental_open_streams( return Status::OK(); } -Status VOlapTableSinkV2::_init_row_distribution() { +Status VTabletWriterV2::_init_row_distribution() { VRowDistributionContext ctx; ctx.state = _state; @@ -141,7 +131,7 @@ Status VOlapTableSinkV2::_init_row_distribution() { ctx.txn_id = _txn_id; ctx.pool = _pool; ctx.location = _location; - ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs; + ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; ctx.on_partitions_created = &vectorized::on_partitions_created; ctx.caller = (void*)this; ctx.schema = _schema; @@ -153,11 +143,17 @@ Status VOlapTableSinkV2::_init_row_distribution() { return Status::OK(); } -Status VOlapTableSinkV2::init(const TDataSink& t_sink) { - DCHECK(t_sink.__isset.olap_table_sink); - auto& table_sink = t_sink.olap_table_sink; +Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) { + _pool = pool; + _group_commit = group_commit; + return Status::OK(); +} + +Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { + auto& table_sink = _t_sink.olap_table_sink; _load_id.set_hi(table_sink.load_id.hi); _load_id.set_lo(table_sink.load_id.lo); + signal::set_signal_task_id(_load_id); _txn_id = table_sink.txn_id; _num_replicas = table_sink.num_replicas; _tuple_desc_id = table_sink.tuple_id; @@ -182,13 +178,8 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) { _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode); RETURN_IF_ERROR(_vpartition->init()); - return Status::OK(); -} - -Status VOlapTableSinkV2::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - _state = state; + _profile = profile; _sender_id = state->per_fragment_instance_idx(); _num_senders = state->num_per_fragment_instances(); @@ -205,9 +196,9 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { (state->execution_timeout() <= config::load_task_high_priority_threshold_second); // profile must add to state's object pool - _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2")); - _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" + - std::to_string(state->load_job_id())); + _profile = state->obj_pool()->add(new RuntimeProfile("VTabletWriterV2")); + _mem_tracker = + std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id())); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -236,8 +227,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); if (config::share_delta_writers) { _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( _load_id, _num_local_sink); @@ -247,13 +236,11 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { return Status::OK(); } -Status VOlapTableSinkV2::open(RuntimeState* state) { - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); +Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { + RETURN_IF_ERROR(_init(state, profile)); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_TIMER(_open_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - signal::set_signal_task_id(_load_id); _build_tablet_node_mapping(); RETURN_IF_ERROR(_open_streams(_backend_id)); @@ -262,7 +249,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { return Status::OK(); } -Status VOlapTableSinkV2::_open_streams(int64_t src_id) { +Status VTabletWriterV2::_open_streams(int64_t src_id) { for (auto& [dst_id, _] : _tablets_for_node) { auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); @@ -272,9 +259,9 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) { return Status::OK(); } -Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id, - ::doris::stream_load::LoadStreams& streams) { - auto node_info = _nodes_info->find_node(dst_id); +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, + ::doris::stream_load::LoadStreams& streams) { + const auto* node_info = _nodes_info->find_node(dst_id); if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } @@ -294,7 +281,7 @@ Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id, return Status::OK(); } -void VOlapTableSinkV2::_build_tablet_node_mapping() { +void VTabletWriterV2::_build_tablet_node_mapping() { std::unordered_set<int64_t> known_indexes; for (const auto& partition : _vpartition->get_partitions()) { for (const auto& index : partition->indexes) { @@ -317,8 +304,8 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() { } } -void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids, - RowsForTablet& rows_for_tablet) { +void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids, + RowsForTablet& rows_for_tablet) { for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { auto& row_ids = row_part_tablet_ids[index_idx].row_ids; auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; @@ -341,8 +328,8 @@ void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& } } -Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams) { +Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, + Streams& streams) { auto location = _location->find_tablet(tablet_id); if (location == nullptr) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); @@ -360,16 +347,16 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t partition_id return Status::OK(); } -Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { +Status VTabletWriterV2::append_block(Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); - if (state->query_options().dry_run_query) { + if (_state->query_options().dry_run_query) { return status; } - auto input_rows = input_block->rows(); - auto input_bytes = input_block->bytes(); + auto input_rows = input_block.rows(); + auto input_bytes = input_block.bytes(); if (UNLIKELY(input_rows == 0)) { return status; } @@ -377,8 +364,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc _number_input_rows += input_rows; // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. - state->update_num_rows_load_total(input_rows); - state->update_num_bytes_load_total(input_bytes); + _state->update_num_rows_load_total(input_rows); + _state->update_num_bytes_load_total(input_bytes); DorisMetrics::instance()->load_rows->increment(input_rows); DorisMetrics::instance()->load_bytes->increment(input_bytes); @@ -391,7 +378,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc std::shared_ptr<vectorized::Block> block; RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - *input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); RowsForTablet rows_for_tablet; _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); @@ -407,9 +394,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc return Status::OK(); } -Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block, - int64_t tablet_id, const Rows& rows, - const Streams& streams) { +Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, + const Rows& rows, const Streams& streams) { DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { WriteRequest req { .tablet_id = tablet_id, @@ -440,7 +426,7 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc return st; } -Status VOlapTableSinkV2::_cancel(Status status) { +Status VTabletWriterV2::_cancel(Status status) { LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", due to error: " << status; if (_delta_writer_for_tablet) { @@ -453,8 +439,9 @@ Status VOlapTableSinkV2::_cancel(Status status) { return Status::OK(); } -Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { - if (_closed) { +Status VTabletWriterV2::close(Status exec_status) { + std::lock_guard<std::mutex> close_lock(_close_mutex); + if (_is_closed) { return _close_status; } SCOPED_TIMER(_close_timer); @@ -511,18 +498,18 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { } } } - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); + _state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(), + std::make_move_iterator(tablet_commit_infos.begin()), + std::make_move_iterator(tablet_commit_infos.end())); _streams_for_node.clear(); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + - state->num_rows_load_unselected(); - state->set_num_rows_load_total(num_rows_load_total); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows()); - state->update_num_rows_load_unselected( + int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + + _state->num_rows_load_unselected(); + _state->set_num_rows_load_total(num_rows_load_total); + _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + _state->update_num_rows_load_unselected( _tablet_finder->num_immutable_partition_filtered_rows()); LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) @@ -531,12 +518,12 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { RETURN_IF_ERROR(_cancel(status)); } + _is_closed = true; _close_status = status; - RETURN_IF_ERROR(DataSink::close(state, exec_status)); return status; } -Status VOlapTableSinkV2::_close_load(const Streams& streams) { +Status VTabletWriterV2::_close_load(const Streams& streams) { auto node_id = streams[0]->dst_id(); std::vector<PTabletID> tablets_to_commit; for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h similarity index 87% copy from be/src/vec/sink/vtablet_sink_v2.h copy to be/src/vec/sink/writer/vtablet_writer_v2.h index 1f317420de4..d4ccf7b6523 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -26,10 +26,10 @@ #include <gen_cpp/types.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> -#include <stddef.h> -#include <stdint.h> #include <atomic> +#include <cstddef> +#include <cstdint> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <functional> @@ -48,7 +48,6 @@ #include "common/config.h" #include "common/status.h" -#include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" #include "runtime/exec_env.h" @@ -65,6 +64,7 @@ #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/vrow_distribution.h" +#include "vec/sink/writer/async_result_writer.h" namespace doris { class DeltaWriterV2; @@ -85,7 +85,7 @@ namespace vectorized { class OlapTableBlockConvertor; class OlapTabletFinder; -class VOlapTableSinkV2; +class VTabletWriterV2; class DeltaWriterV2Map; using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; @@ -100,31 +100,30 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>; // Write block data to Olap Table. // When OlapTableSink::open() called, there will be a consumer thread running in the background. -// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches. +// When you call VTabletWriterV2::send(), you will be the producer who products pending batches. // Join the consumer thread in close(). -class VOlapTableSinkV2 final : public DataSink { +class VTabletWriterV2 final : public AsyncResultWriter { public: // Construct from thrift struct which is generated by FE. - VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, Status* status); + VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - ~VOlapTableSinkV2() override; + ~VTabletWriterV2() override; - Status init(const TDataSink& sink) override; - // TODO: unify the code of prepare/open/close with result sink - Status prepare(RuntimeState* state) override; + Status init_properties(ObjectPool* pool, bool group_commit); - Status open(RuntimeState* state) override; + Status append_block(Block& block) override; - Status close(RuntimeState* state, Status close_status) override; + Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status close(Status close_status) override; Status on_partitions_created(TCreatePartitionResult* result); private: Status _init_row_distribution(); + Status _init(RuntimeState* state, RuntimeProfile* profile); + Status _open_streams(int64_t src_id); Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams); @@ -148,6 +147,7 @@ private: std::shared_ptr<MemTracker> _mem_tracker; + TDataSink _t_sink; ObjectPool* _pool; // unique load id @@ -202,13 +202,16 @@ private: RuntimeProfile::Counter* _close_load_timer = nullptr; RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + std::mutex _close_mutex; + bool _is_closed = false; // Save the status of close() method Status _close_status; VOlapTablePartitionParam* _vpartition = nullptr; - vectorized::VExprContextSPtrs _output_vexpr_ctxs; - RuntimeState* _state = nullptr; + RuntimeState* _state = nullptr; // not owned, set when open + RuntimeProfile* _profile = nullptr; // not owned, set when open + bool _group_commit = false; std::unordered_set<int64_t> _opened_partitions; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org