This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d1e8bebb3c6 [refactor](shuffle) (PART I) Unify all hash-partition based shuffling (#45256) d1e8bebb3c6 is described below commit d1e8bebb3c610bc729b0a088fd693f013aeb5336 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Dec 11 14:20:41 2024 +0800 [refactor](shuffle) (PART I) Unify all hash-partition based shuffling (#45256) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 308 +++++---------------- be/src/pipeline/exec/exchange_sink_operator.h | 58 +--- be/src/pipeline/shuffle/writer.cpp | 114 ++++++++ be/src/pipeline/shuffle/writer.h | 53 ++++ be/src/vec/runtime/partitioner.h | 16 +- .../sink/scale_writer_partitioning_exchanger.hpp | 92 ++++-- be/src/vec/sink/tablet_sink_hash_partitioner.cpp | 152 ++++++++++ be/src/vec/sink/tablet_sink_hash_partitioner.h | 83 ++++++ be/src/vec/sink/vdata_stream_sender.cpp | 12 +- be/src/vec/sink/vdata_stream_sender.h | 8 +- 10 files changed, 562 insertions(+), 334 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 04b9653e9c8..aa893fc0a26 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -38,6 +38,7 @@ #include "util/uid_util.h" #include "vec/columns/column_const.h" #include "vec/exprs/vexpr.h" +#include "vec/sink/tablet_sink_hash_partitioner.h" namespace doris::pipeline { #include "common/compile_check_begin.h" @@ -120,6 +121,57 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this); } + if (_part_type == TPartitionType::HASH_PARTITIONED) { + _partition_count = channels.size(); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + channels.size()); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + _partition_count = channels.size(); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + channels.size()); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _partition_count = channels.size(); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + _partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>( + _partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema, + p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this); + RETURN_IF_ERROR(_partitioner->init({})); + RETURN_IF_ERROR(_partitioner->prepare(state, {})); + } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + _partition_count = + channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; + _partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>( + channels.size(), _partition_count, channels.size(), 1, + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num()); + + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } + return Status::OK(); } @@ -145,6 +197,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); + _writer.reset(new Writer()); auto& p = _parent->cast<ExchangeSinkOperatorX>(); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || @@ -190,113 +243,16 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } } } - if (_part_type == TPartitionType::HASH_PARTITIONED) { - _partition_count = channels.size(); - _partitioner = - std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( - channels.size()); - RETURN_IF_ERROR(_partitioner->init(p._texprs)); - RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { - _partition_count = channels.size(); - _partitioner = - std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( - channels.size()); - RETURN_IF_ERROR(_partitioner->init(p._texprs)); - RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { - _partition_count = channels.size(); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - _txn_id = p._tablet_sink_txn_id; - _schema = std::make_shared<OlapTableSchemaParam>(); - RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema)); - _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, p._tablet_sink_partition); - RETURN_IF_ERROR(_vpartition->init()); - auto find_tablet_mode = vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; - _tablet_finder = - std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode); - _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); - _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); - _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); - for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); - } - // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column - // on exchange node rather than on TabletWriter - _block_convertor = - std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); - _location = p._pool->add(new OlapTableLocationParam(p._tablet_sink_location)); - _row_distribution.init( - {.state = _state, - .block_convertor = _block_convertor.get(), - .tablet_finder = _tablet_finder.get(), - .vpartition = _vpartition.get(), - .add_partition_request_timer = _add_partition_request_timer, - .txn_id = _txn_id, - .pool = p._pool.get(), - .location = _location, - .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, - .schema = _schema, - .caller = (void*)this, - .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); - } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { - _partition_count = - channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; - _partitioner = - std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( - _partition_count); - _partition_function = std::make_unique<HashPartitionFunction>(_partitioner.get()); - - scale_writer_partitioning_exchanger = std::make_unique< - vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>( - channels.size(), *_partition_function, _partition_count, channels.size(), 1, - config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / - state->task_num() == - 0 - ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold - : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / - state->task_num(), - config::table_sink_partition_write_min_data_processed_rebalance_threshold / - state->task_num() == - 0 - ? config::table_sink_partition_write_min_data_processed_rebalance_threshold - : config::table_sink_partition_write_min_data_processed_rebalance_threshold / - state->task_num()); - - RETURN_IF_ERROR(_partitioner->init(p._texprs)); - RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - } if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || - _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED || + _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { RETURN_IF_ERROR(_partitioner->open(state)); - } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { - RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); } return Status::OK(); } -Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { - RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); - auto& p = _parent->cast<ExchangeSinkOperatorX>(); - // Recovery back - _row_distribution.clear_batching_stats(); - _row_distribution._batching_block->clear_column_data(); - _row_distribution._deal_batched = false; - RETURN_IF_ERROR(p.sink(_state, input_block, false)); - return Status::OK(); -} - std::string ExchangeSinkLocalState::name_suffix() { std::string name = " (id=" + std::to_string(_parent->node_id()); auto& p = _parent->cast<ExchangeSinkOperatorX>(); @@ -491,105 +447,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state.current_channel_idx = (local_state.current_channel_idx + 1) % local_state.channels.size(); } else if (_part_type == TPartitionType::HASH_PARTITIONED || - _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { - auto rows = block->rows(); - { - SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); - } - int64_t old_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - old_channel_mem_usage += channel->mem_usage(); - } - if (_part_type == TPartitionType::HASH_PARTITIONED) { - SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); - RETURN_IF_ERROR(channel_add_rows( - state, local_state.channels, local_state._partition_count, - local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos)); - } else { - SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); - RETURN_IF_ERROR(channel_add_rows( - state, local_state.channels, local_state._partition_count, - local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos)); - } - int64_t new_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - new_channel_mem_usage += channel->mem_usage(); - } - COUNTER_UPDATE(local_state.memory_used_counter(), - new_channel_mem_usage - old_channel_mem_usage); - } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { - int64_t old_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - old_channel_mem_usage += channel->mem_usage(); - } - // check out of limit - std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); - const auto& num_channels = local_state._partition_count; - std::vector<std::vector<uint32>> channel2rows; - channel2rows.resize(num_channels); - auto input_rows = block->rows(); - - if (input_rows > 0) { - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - local_state._number_input_rows += input_rows; - - RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( - *block, convert_block, filtered_rows, has_filtered_rows, - local_state._row_part_tablet_ids, local_state._number_input_rows)); - if (local_state._row_distribution.batching_rows() > 0) { - SCOPED_TIMER(local_state._send_new_partition_timer); - RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); - } else { - const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); - channel2rows[tablet_id_hash % num_channels].emplace_back(row); - } - } - } - - { - SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); - // the convert_block maybe different with block after execute exprs - // when send data we still use block - RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, - channel2rows, block, eos)); - } - int64_t new_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - new_channel_mem_usage += channel->mem_usage(); - } - COUNTER_UPDATE(local_state.memory_used_counter(), - new_channel_mem_usage - old_channel_mem_usage); - } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { - int64_t old_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - old_channel_mem_usage += channel->mem_usage(); - } - { - SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); - } - std::vector<std::vector<uint32>> assignments = - local_state.scale_writer_partitioning_exchanger->accept(block); - { - SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); - RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, - local_state.channels.size(), assignments, - block, eos)); - } - - int64_t new_channel_mem_usage = 0; - for (const auto& channel : local_state.channels) { - new_channel_mem_usage += channel->mem_usage(); - } - COUNTER_UPDATE(local_state.memory_used_counter(), - new_channel_mem_usage - old_channel_mem_usage); + _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || + _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || + _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos)); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel @@ -641,44 +502,6 @@ void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf } } -Status ExchangeSinkOperatorX::channel_add_rows( - RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - size_t num_channels, const uint32_t* __restrict channel_ids, size_t rows, - vectorized::Block* block, bool eos) { - std::vector<std::vector<uint32_t>> channel2rows; - channel2rows.resize(num_channels); - for (uint32_t i = 0; i < rows; i++) { - channel2rows[channel_ids[i]].emplace_back(i); - } - - RETURN_IF_ERROR( - channel_add_rows_with_idx(state, channels, num_channels, channel2rows, block, eos)); - return Status::OK(); -} - -Status ExchangeSinkOperatorX::channel_add_rows_with_idx( - RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows, - vectorized::Block* block, bool eos) { - Status status = Status::OK(); - for (int i = 0; i < num_channels; ++i) { - if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { - status = channels[i]->add_rows(block, channel2rows[i], false); - HANDLE_CHANNEL_STATUS(state, channels[i], status); - channel2rows[i].clear(); - } - } - if (eos) { - for (int i = 0; i < num_channels; ++i) { - if (!channels[i]->is_receiver_eof()) { - status = channels[i]->add_rows(block, channel2rows[i], true); - HANDLE_CHANNEL_STATUS(state, channels[i], status); - } - } - } - return Status::OK(); -} - std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); @@ -697,17 +520,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED && - _block_convertor != nullptr && _tablet_finder != nullptr) { - _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows()); - _state->update_num_rows_load_unselected( - _tablet_finder->num_immutable_partition_filtered_rows()); - // sink won't see those filtered rows, we should compensate here - _state->set_num_rows_load_total(_state->num_rows_load_filtered() + - _state->num_rows_load_unselected()); - } SCOPED_TIMER(exec_time_counter()); + if (_partitioner) { + RETURN_IF_ERROR(_partitioner->close(state)); + } SCOPED_TIMER(_close_timer); if (_queue_dependency) { COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8d094b43f61..e88389b1d7b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -26,6 +26,7 @@ #include "common/status.h" #include "exchange_sink_buffer.h" #include "operator.h" +#include "pipeline/shuffle/writer.h" #include "vec/sink/scale_writer_partitioning_exchanger.hpp" #include "vec/sink/vdata_stream_sender.h" @@ -39,20 +40,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState<>; -private: - class HashPartitionFunction { - public: - HashPartitionFunction(vectorized::PartitionerBase* partitioner) - : _partitioner(partitioner) {} - - int get_partition(vectorized::Block* block, int position) { - return _partitioner->get_channel_ids().get<uint32_t>()[position]; - } - - private: - vectorized::PartitionerBase* _partitioner; - }; - public: ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _serializer(this) { @@ -106,19 +93,20 @@ public: std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; - static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { - return Status::OK(); + RuntimeProfile::Counter* send_new_partition_timer() { return _send_new_partition_timer; } + RuntimeProfile::Counter* add_partition_request_timer() { return _add_partition_request_timer; } + RuntimeProfile::Counter* split_block_hash_compute_timer() { + return _split_block_hash_compute_timer; + } + RuntimeProfile::Counter* distribute_rows_into_channels_timer() { + return _distribute_rows_into_channels_timer; } - Status _send_new_partition_batch(vectorized::Block* input_block); std::vector<std::shared_ptr<vectorized::Channel>> channels; int current_channel_idx {0}; // index of current channel to send to if _random == true bool only_local_exchange {false}; void on_channel_finished(InstanceLoId channel_id); - - // for external table sink hash partition - std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>> - scale_writer_partitioning_exchanger; + vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); } private: friend class ExchangeSinkOperatorX; @@ -176,28 +164,16 @@ private: */ std::vector<std::shared_ptr<Dependency>> _local_channels_dependency; std::unique_ptr<vectorized::PartitionerBase> _partitioner; + std::unique_ptr<Writer> _writer; size_t _partition_count; std::shared_ptr<Dependency> _finish_dependency; // for shuffle data by partition and tablet - int64_t _txn_id = -1; - vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; - std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr; - std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr; - std::shared_ptr<OlapTableSchemaParam> _schema = nullptr; - std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor = nullptr; - TupleDescriptor* _tablet_sink_tuple_desc = nullptr; - RowDescriptor* _tablet_sink_row_desc = nullptr; - OlapTableLocationParam* _location = nullptr; - vectorized::VRowDistribution _row_distribution; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; - std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids; - int64_t _number_input_rows = 0; TPartitionType::type _part_type; - // for external table sink hash partition - std::unique_ptr<HashPartitionFunction> _partition_function = nullptr; std::atomic<bool> _reach_limit = false; int _last_local_channel_idx = -1; @@ -230,6 +206,7 @@ public: // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. /// TODO: Modify this to let FE handle the judgment instead of BE. std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id); + vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; } private: friend class ExchangeSinkLocalState; @@ -237,17 +214,6 @@ private: template <typename ChannelPtrType> void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); - Status channel_add_rows(RuntimeState* state, - std::vector<std::shared_ptr<vectorized::Channel>>& channels, - size_t num_channels, const uint32_t* __restrict channel_ids, - size_t rows, vectorized::Block* block, bool eos); - - Status channel_add_rows_with_idx(RuntimeState* state, - std::vector<std::shared_ptr<vectorized::Channel>>& channels, - size_t num_channels, - std::vector<std::vector<uint32_t>>& channel2rows, - vectorized::Block* block, bool eos); - // Use ExchangeSinkOperatorX to create a sink buffer. // The sink buffer can be shared among multiple ExchangeSinkLocalState instances, // or each ExchangeSinkLocalState can have its own sink buffer. diff --git a/be/src/pipeline/shuffle/writer.cpp b/be/src/pipeline/shuffle/writer.cpp new file mode 100644 index 00000000000..c27fd9a7aeb --- /dev/null +++ b/be/src/pipeline/shuffle/writer.cpp @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "writer.h" + +#include "pipeline/exec/exchange_sink_operator.h" +#include "vec/core/block.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" +template <typename ChannelPtrType> +void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { + channel->set_receiver_eof(st); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + static_cast<void>(channel->close(state)); +} + +Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state, + vectorized::Block* block, bool eos) const { + auto rows = block->rows(); + { + SCOPED_TIMER(local_state->split_block_hash_compute_timer()); + RETURN_IF_ERROR(local_state->partitioner()->do_partitioning(state, block)); + } + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state->channels) { + old_channel_mem_usage += channel->mem_usage(); + } + { + SCOPED_TIMER(local_state->distribute_rows_into_channels_timer()); + const auto& channel_filed = local_state->partitioner()->get_channel_ids(); + if (channel_filed.len == sizeof(uint32_t)) { + RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, + local_state->channels.size(), + channel_filed.get<uint32_t>(), rows, block, eos)); + } else { + RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, + local_state->channels.size(), + channel_filed.get<int64_t>(), rows, block, eos)); + } + } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state->channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state->memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + return Status::OK(); +} + +template <typename ChannelIdType> +Status Writer::_channel_add_rows(RuntimeState* state, + std::vector<std::shared_ptr<vectorized::Channel>>& channels, + size_t partition_count, + const ChannelIdType* __restrict channel_ids, size_t rows, + vectorized::Block* block, bool eos) const { + std::vector<uint32_t> partition_rows_histogram; + auto row_idx = vectorized::PODArray<uint32_t>(rows); + { + partition_rows_histogram.assign(partition_count + 2, 0); + for (size_t i = 0; i < rows; ++i) { + partition_rows_histogram[channel_ids[i] + 1]++; + } + for (size_t i = 1; i <= partition_count + 1; ++i) { + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; + } + for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) { + row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i; + partition_rows_histogram[channel_ids[i] + 1]--; + } + } +#define HANDLE_CHANNEL_STATUS(state, channel, status) \ + do { \ + if (status.is<ErrorCode::END_OF_FILE>()) { \ + _handle_eof_channel(state, channel, status); \ + } else { \ + RETURN_IF_ERROR(status); \ + } \ + } while (0) + Status status = Status::OK(); + for (size_t i = 0; i < partition_count; ++i) { + uint32_t start = partition_rows_histogram[i + 1]; + uint32_t size = partition_rows_histogram[i + 2] - start; + if (!channels[i]->is_receiver_eof() && size > 0) { + status = channels[i]->add_rows(block, row_idx.data(), start, size, false); + HANDLE_CHANNEL_STATUS(state, channels[i], status); + } + } + if (eos) { + for (int i = 0; i < partition_count; ++i) { + if (!channels[i]->is_receiver_eof()) { + status = channels[i]->add_rows(block, row_idx.data(), 0, 0, true); + HANDLE_CHANNEL_STATUS(state, channels[i], status); + } + } + } + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h new file mode 100644 index 00000000000..0eb77212029 --- /dev/null +++ b/be/src/pipeline/shuffle/writer.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/sink/vdata_stream_sender.h" + +namespace doris { +class RuntimeState; +class Status; +namespace vectorized { +class Block; +class Channel; +} // namespace vectorized +namespace pipeline { + +#include "common/compile_check_begin.h" +class ExchangeSinkLocalState; + +class Writer { +public: + Writer() = default; + + Status write(ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Block* block, + bool eos) const; + +private: + template <typename ChannelIdType> + Status _channel_add_rows(RuntimeState* state, + std::vector<std::shared_ptr<vectorized::Channel>>& channels, + size_t partition_count, const ChannelIdType* __restrict channel_ids, + size_t rows, vectorized::Block* block, bool eos) const; + + template <typename ChannelPtrType> + void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const; +}; +#include "common/compile_check_end.h" +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index d5492c3be87..53d8b84d09c 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -21,11 +21,8 @@ #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -namespace doris { +namespace doris::vectorized { #include "common/compile_check_begin.h" - -namespace vectorized { - struct ChannelField { const void* channel_id; const uint32_t len; @@ -48,12 +45,16 @@ public: virtual Status open(RuntimeState* state) = 0; + virtual Status close(RuntimeState* state) = 0; + virtual Status do_partitioning(RuntimeState* state, Block* block) const = 0; virtual ChannelField get_channel_ids() const = 0; virtual Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) = 0; + size_t partition_count() const { return _partition_count; } + protected: const size_t _partition_count; }; @@ -74,6 +75,8 @@ public: Status open(RuntimeState* state) override { return VExpr::open(_partition_expr_ctxs, state); } + Status close(RuntimeState* state) override { return Status::OK(); } + Status do_partitioning(RuntimeState* state, Block* block) const override; ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(uint32_t)}; } @@ -108,8 +111,5 @@ struct SpillPartitionChannelIds { return ((l >> 16) | (l << 16)) % r; } }; - -} // namespace vectorized -} // namespace doris - #include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp index f7435249c20..92e52af4c67 100644 --- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -24,28 +24,51 @@ #include "vec/core/block.h" #include "vec/exec/skewed_partition_rebalancer.h" +#include "vec/runtime/partitioner.h" namespace doris::vectorized { -template <typename PartitionFunction> -class ScaleWriterPartitioningExchanger { +class ScaleWriterPartitioner final : public PartitionerBase { public: - ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function, - int partition_count, int task_count, int task_bucket_count, - long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold) - : _channel_size(channel_size), - _partition_function(partition_function), + using HashValType = uint32_t; + ScaleWriterPartitioner(int channel_size, int partition_count, int task_count, + int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : PartitionerBase(partition_count), + _channel_size(channel_size), _partition_rebalancer(partition_count, task_count, task_bucket_count, min_partition_data_processed_rebalance_threshold, min_data_processed_rebalance_threshold), _partition_row_counts(partition_count, 0), _partition_writer_ids(partition_count, -1), - _partition_writer_indexes(partition_count, 0) {} + _partition_writer_indexes(partition_count, 0), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold(min_data_processed_rebalance_threshold) { + _crc_partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _partition_count); + } + + ~ScaleWriterPartitioner() override = default; + + Status init(const std::vector<TExpr>& texprs) override { + return _crc_partitioner->init(texprs); + } + + Status prepare(RuntimeState* state, const RowDescriptor& row_desc) override { + return _crc_partitioner->prepare(state, row_desc); + } + + Status open(RuntimeState* state) override { return _crc_partitioner->open(state); } + + Status close(RuntimeState* state) override { return _crc_partitioner->close(state); } - std::vector<std::vector<uint32_t>> accept(Block* block) { - std::vector<std::vector<uint32_t>> writerAssignments(_channel_size, - std::vector<uint32_t>()); + Status do_partitioning(RuntimeState* state, Block* block) const override { + _hash_vals.resize(block->rows()); for (int partition_id = 0; partition_id < _partition_row_counts.size(); partition_id++) { _partition_row_counts[partition_id] = 0; _partition_writer_ids[partition_id] = -1; @@ -53,40 +76,59 @@ public: _partition_rebalancer.rebalance(); - for (int position = 0; position < block->rows(); position++) { - int partition_id = _partition_function.get_partition(block, position); + RETURN_IF_ERROR(_crc_partitioner->do_partitioning(state, block)); + const auto* crc_values = _crc_partitioner->get_channel_ids().get<uint32_t>(); + for (size_t position = 0; position < block->rows(); position++) { + int partition_id = crc_values[position]; _partition_row_counts[partition_id] += 1; // Get writer id for this partition by looking at the scaling state int writer_id = _partition_writer_ids[partition_id]; if (writer_id == -1) { - writer_id = get_next_writer_id(partition_id); + writer_id = _get_next_writer_id(partition_id); _partition_writer_ids[partition_id] = writer_id; } - writerAssignments[writer_id].push_back(position); + _hash_vals[position] = writer_id; } - for (int partition_id = 0; partition_id < _partition_row_counts.size(); partition_id++) { + for (size_t partition_id = 0; partition_id < _partition_row_counts.size(); partition_id++) { _partition_rebalancer.add_partition_row_count(partition_id, _partition_row_counts[partition_id]); } _partition_rebalancer.add_data_processed(block->bytes()); - return writerAssignments; + return Status::OK(); + } + + ChannelField get_channel_ids() const override { + return {_hash_vals.data(), sizeof(HashValType)}; } - int get_next_writer_id(int partition_id) { + Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override { + partitioner.reset(new ScaleWriterPartitioner( + _channel_size, _partition_count, _task_count, _task_bucket_count, + _min_partition_data_processed_rebalance_threshold, + _min_data_processed_rebalance_threshold)); + return Status::OK(); + } + +private: + int _get_next_writer_id(int partition_id) const { return _partition_rebalancer.get_task_id(partition_id, _partition_writer_indexes[partition_id]++); } -private: int _channel_size; - PartitionFunction& _partition_function; - SkewedPartitionRebalancer _partition_rebalancer; - std::vector<int> _partition_row_counts; - std::vector<int> _partition_writer_ids; - std::vector<int> _partition_writer_indexes; + std::unique_ptr<PartitionerBase> _crc_partitioner; + mutable SkewedPartitionRebalancer _partition_rebalancer; + mutable std::vector<int> _partition_row_counts; + mutable std::vector<int> _partition_writer_ids; + mutable std::vector<int> _partition_writer_indexes; + mutable std::vector<HashValType> _hash_vals; + const int _task_count; + const int _task_bucket_count; + const long _min_partition_data_processed_rebalance_threshold; + const long _min_data_processed_rebalance_threshold; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.cpp b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp new file mode 100644 index 00000000000..7e854d19add --- /dev/null +++ b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/tablet_sink_hash_partitioner.h" + +#include "pipeline/exec/operator.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" +TabletSinkHashPartitioner::TabletSinkHashPartitioner( + size_t partition_count, int64_t txn_id, const TOlapTableSchemaParam& tablet_sink_schema, + const TOlapTablePartitionParam& tablet_sink_partition, + const TOlapTableLocationParam& tablet_sink_location, const TTupleId& tablet_sink_tuple_id, + pipeline::ExchangeSinkLocalState* local_state) + : PartitionerBase(partition_count), + _txn_id(txn_id), + _tablet_sink_schema(tablet_sink_schema), + _tablet_sink_partition(tablet_sink_partition), + _tablet_sink_location(tablet_sink_location), + _tablet_sink_tuple_id(tablet_sink_tuple_id), + _local_state(local_state) {} + +Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) { + return Status::OK(); +} + +Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) { + return Status::OK(); +} + +Status TabletSinkHashPartitioner::open(RuntimeState* state) { + _schema = std::make_shared<OlapTableSchemaParam>(); + RETURN_IF_ERROR(_schema->init(_tablet_sink_schema)); + _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, _tablet_sink_partition); + RETURN_IF_ERROR(_vpartition->init()); + auto find_tablet_mode = vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + _tablet_finder = + std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode); + _tablet_sink_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id); + _tablet_sink_row_desc = + state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + auto& ctxs = + _local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>().tablet_sink_expr_ctxs(); + _tablet_sink_expr_ctxs.resize(ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } + // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column + // on exchange node rather than on TabletWriter + _block_convertor = + std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc); + _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), state->batch_size()); + _location = state->obj_pool()->add(new OlapTableLocationParam(_tablet_sink_location)); + _row_distribution.init( + {.state = state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition.get(), + .add_partition_request_timer = _local_state->add_partition_request_timer(), + .txn_id = _txn_id, + .pool = state->obj_pool(), + .location = _location, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, + .schema = _schema, + .caller = (void*)this, + .create_partition_callback = &TabletSinkHashPartitioner::empty_callback_function}); + RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); + return Status::OK(); +} + +Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block) const { + _hash_vals.resize(block->rows()); + if (block->empty()) { + return Status::OK(); + } + std::fill(_hash_vals.begin(), _hash_vals.end(), -1); + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + int64_t number_input_rows = _local_state->rows_input_counter()->value(); + std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, + number_input_rows)); + if (_row_distribution.batching_rows() > 0) { + SCOPED_TIMER(_local_state->send_new_partition_timer()); + RETURN_IF_ERROR(_send_new_partition_batch(state, block)); + } else { + const auto& row_ids = _row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0); + _hash_vals[row] = tablet_id_hash % _partition_count; + } + } + + return Status::OK(); +} + +ChannelField TabletSinkHashPartitioner::get_channel_ids() const { + return {_hash_vals.data(), sizeof(HashValType)}; +} + +Status TabletSinkHashPartitioner::clone(RuntimeState* state, + std::unique_ptr<PartitionerBase>& partitioner) { + partitioner.reset(new TabletSinkHashPartitioner(_partition_count, _txn_id, _tablet_sink_schema, + _tablet_sink_partition, _tablet_sink_location, + _tablet_sink_tuple_id, _local_state)); + return Status::OK(); +} + +Status TabletSinkHashPartitioner::close(RuntimeState* state) { + if (_block_convertor != nullptr && _tablet_finder != nullptr) { + state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); + // sink won't see those filtered rows, we should compensate here + state->set_num_rows_load_total(state->num_rows_load_filtered() + + state->num_rows_load_unselected()); + } + return Status::OK(); +} + +Status TabletSinkHashPartitioner::_send_new_partition_batch(RuntimeState* state, + vectorized::Block* input_block) const { + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + auto& p = _local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>(); + // Recovery back + _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + RETURN_IF_ERROR(p.sink(state, input_block, false)); + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.h b/be/src/vec/sink/tablet_sink_hash_partitioner.h new file mode 100644 index 00000000000..dbbc7c88926 --- /dev/null +++ b/be/src/vec/sink/tablet_sink_hash_partitioner.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/tablet_info.h" +#include "pipeline/exec/exchange_sink_operator.h" +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/runtime/partitioner.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" +class TabletSinkHashPartitioner final : public PartitionerBase { +public: + using HashValType = int64_t; + TabletSinkHashPartitioner(size_t partition_count, int64_t txn_id, + const TOlapTableSchemaParam& tablet_sink_schema, + const TOlapTablePartitionParam& tablet_sink_partition, + const TOlapTableLocationParam& tablet_sink_location, + const TTupleId& tablet_sink_tuple_id, + pipeline::ExchangeSinkLocalState* local_state); + + ~TabletSinkHashPartitioner() override = default; + + Status init(const std::vector<TExpr>& texprs) override; + + Status prepare(RuntimeState* state, const RowDescriptor& row_desc) override; + + Status open(RuntimeState* state) override; + + Status do_partitioning(RuntimeState* state, Block* block) const override; + + ChannelField get_channel_ids() const override; + Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override; + + Status close(RuntimeState* state) override; + +private: + static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { + return Status::OK(); + } + + Status _send_new_partition_batch(RuntimeState* state, vectorized::Block* input_block) const; + + const int64_t _txn_id = -1; + const TOlapTableSchemaParam _tablet_sink_schema; + const TOlapTablePartitionParam _tablet_sink_partition; + const TOlapTableLocationParam _tablet_sink_location; + const TTupleId _tablet_sink_tuple_id; + mutable pipeline::ExchangeSinkLocalState* _local_state; + mutable OlapTableLocationParam* _location = nullptr; + mutable vectorized::VRowDistribution _row_distribution; + mutable vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; + mutable std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr; + mutable std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr; + mutable std::shared_ptr<OlapTableSchemaParam> _schema = nullptr; + mutable std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor = nullptr; + mutable TupleDescriptor* _tablet_sink_tuple_desc = nullptr; + mutable RowDescriptor* _tablet_sink_row_desc = nullptr; + mutable std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids; + mutable std::vector<HashValType> _hash_vals; +}; +#include "common/compile_check_end.h" + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 66aacc59f6c..abed6133458 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -239,18 +239,18 @@ BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, - bool* serialized, bool eos, - const std::vector<uint32_t>* rows) { + bool* serialized, bool eos, const uint32_t* data, + const uint32_t offset, const uint32_t size) { if (_mutable_block == nullptr) { _mutable_block = MutableBlock::create_unique(block->clone_empty()); } { SCOPED_TIMER(_parent->merge_block_timer()); - if (rows) { - if (!rows->empty()) { - const auto* begin = rows->data(); - RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + rows->size())); + if (data) { + if (size > 0) { + RETURN_IF_ERROR( + _mutable_block->add_rows(block, data + offset, data + offset + size)); } } else if (!block->empty()) { RETURN_IF_ERROR(_mutable_block->merge(*block)); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 16ea49e443c..0ff1f252d54 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -80,7 +80,8 @@ public: BlockSerializer() : _batch_size(0) {}; #endif Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, - bool eos, const std::vector<uint32_t>* rows = nullptr); + bool eos, const uint32_t* data = nullptr, + const uint32_t offset = 0, const uint32_t size = 0); Status serialize_block(PBlock* dest, size_t num_receivers = 1); Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1); @@ -150,7 +151,8 @@ public: Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { + Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size, + bool eos) { if (_fragment_instance_id.lo == -1) { return Status::OK(); } @@ -160,7 +162,7 @@ public: _pblock = std::make_unique<PBlock>(); } RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, - &rows)); + data, offset, size)); if (serialized) { RETURN_IF_ERROR(_send_current_block(eos)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org