github-actions[bot] commented on code in PR #32364: URL: https://github.com/apache/doris/pull/32364#discussion_r1527752829
########## be/src/pipeline/exec/exchange_sink_operator.h: ########## @@ -68,6 +69,21 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState<>; +private: + class HashPartitionFunction { + public: + HashPartitionFunction(vectorized::PartitionerBase* partitioner) + : _partitioner(partitioner) {} + + int get_partition(vectorized::Block* block, int position) { + uint32_t* partition_ids = (uint32_t*)_partitioner->get_channel_ids(); Review Comment: warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto] ```suggestion auto* partition_ids = (uint32_t*)_partitioner->get_channel_ids(); ``` ########## be/src/pipeline/exec/hive_table_sink_operator.cpp: ########## @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hive_table_sink_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { + +OperatorPtr HiveTableSinkOperatorBuilder::build_operator() { + return std::make_shared<HiveTableSinkOperator>(this, _sink); +} + +Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { Review Comment: warning: method 'init' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/exec/hive_table_sink_operator.h:54: ```diff - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + static Status init(RuntimeState* state, LocalSinkStateInfo& info) override; ``` ########## be/src/pipeline/exec/hive_table_sink_operator.cpp: ########## @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hive_table_sink_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { + +OperatorPtr HiveTableSinkOperatorBuilder::build_operator() { + return std::make_shared<HiveTableSinkOperator>(this, _sink); +} + +Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast<Parent>(); + RETURN_IF_ERROR(_writer->init_properties(p._pool)); + return Status::OK(); +} + +Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status) { Review Comment: warning: method 'close' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/exec/hive_table_sink_operator.h:61: ```diff - Status close(RuntimeState* state, Status exec_status) override; + static Status close(RuntimeState* state, Status exec_status) override; ``` ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { Review Comment: warning: method 'open' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status open(RuntimeState* state) override { ``` ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::pipeline { ``` be/src/pipeline/exec/hive_table_sink_operator.h:113: ```diff - } // namespace pipeline - } // namespace doris + } // namespace doris ``` ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class HiveTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> { +public: + using Base = DataSinkOperatorX<HiveTableSinkLocalState>; + HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { Review Comment: warning: method 'prepare' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status prepare(RuntimeState* state) override { ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); Review Comment: warning: std::move of the variable 'task_bucket' of the trivially-copyable type 'doris::vectorized::SkewedPartitionRebalancer::TaskBucket' has no effect; remove std::move() [performance-move-const-arg] ```suggestion _partition_assignments[partition].emplace_back(task_bucket); ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { Review Comment: warning: 'auto &elem' can be declared as 'const auto &elem' [readability-qualified-auto] ```suggestion for (const auto& elem : max_partitions) { ``` ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class HiveTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> { +public: + using Base = DataSinkOperatorX<HiveTableSinkLocalState>; + HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { Review Comment: warning: method 'sink' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); + } + + for (int partition = 0; partition < _partition_count; partition++) { + auto& taskAssignments = _partition_assignments[partition]; + for (const auto& taskBucket : taskAssignments) { + auto& queue = task_bucket_max_partitions[taskBucket.id]; + queue.add_or_update(partition, + _partition_data_size_since_last_rebalance_per_task[partition]); + } + } + + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> + max_task_buckets; + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> + min_task_buckets; + + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = + _calculate_task_bucket_data_size_since_last_rebalance( + task_bucket_max_partitions[task_bucket1.id]); + max_task_buckets.add_or_update( + std::move(task_bucket1), Review Comment: warning: std::move of the variable 'task_bucket1' of the trivially-copyable type 'doris::vectorized::SkewedPartitionRebalancer::TaskBucket' has no effect; remove std::move() [performance-move-const-arg] ```suggestion task_bucket1, ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { Review Comment: warning: method '_should_rebalance' can be made const [readability-make-member-function-const] ```suggestion bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) const { ``` be/src/vec/exec/skewed_partition_rebalancer.h:110: ```diff - bool _should_rebalance(long data_processed); + bool _should_rebalance(long data_processed) const; ``` ########## be/src/vec/sink/vhive_table_sink.cpp: ########## @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vhive_table_sink.h" + +namespace doris { +class TExpr; + +namespace vectorized { + +VHiveTableSink::VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector<TExpr>& texprs) + : AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK>(row_desc, texprs), _pool(pool) {} + +VHiveTableSink::~VHiveTableSink() = default; + +Status VHiveTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); + RETURN_IF_ERROR(_writer->init_properties(_pool)); + return Status::OK(); +} + +Status VHiveTableSink::close(RuntimeState* state, Status exec_status) { Review Comment: warning: method 'close' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/vhive_table_sink.h:42: ```diff - Status close(RuntimeState* state, Status exec_status) override; + static Status close(RuntimeState* state, Status exec_status) override; ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); + } + + for (int partition = 0; partition < _partition_count; partition++) { + auto& taskAssignments = _partition_assignments[partition]; + for (const auto& taskBucket : taskAssignments) { + auto& queue = task_bucket_max_partitions[taskBucket.id]; + queue.add_or_update(partition, + _partition_data_size_since_last_rebalance_per_task[partition]); + } + } + + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> + max_task_buckets; + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> + min_task_buckets; + + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = + _calculate_task_bucket_data_size_since_last_rebalance( + task_bucket_max_partitions[task_bucket1.id]); + max_task_buckets.add_or_update( + std::move(task_bucket1), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); Review Comment: warning: 'task_bucket1' used after it was moved [bugprone-use-after-move] ```cpp _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); ^ ``` <details> <summary>Additional context</summary> **be/src/vec/exec/skewed_partition_rebalancer.cpp:289:** move occurred here ```cpp std::move(task_bucket1), ^ ``` **be/src/vec/exec/skewed_partition_rebalancer.cpp:290:** the use and move are unsequenced, i.e. there is no guarantee about the order in which they are evaluated ```cpp _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); ^ ``` </details> ########## be/src/vec/exec/skewed_partition_rebalancer.h: ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +/** + * Helps in distributing big or skewed partitions across available tasks to improve the performance of + * partitioned writes. + * <p> + * This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to + * uniformly distribute partitions across those buckets. This helps to mitigate two problems: + * 1. Mitigate skewness across tasks. + * 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the + * local scaling without impacting much overall resource utilization. + * <p> + * Example: + * <p> + * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 + * Bucket2 Bucket2 Bucket2 + * Bucket3 Bucket3 Bucket3 + * <p> + * After rebalancing: + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1) + * Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2) + * Bucket3 Bucket3 Bucket3 + */ + +#pragma once + +#include <algorithm> +#include <iostream> +#include <list> +#include <optional> +#include <vector> + +#include "util/indexed_priority_queue.hpp" + +namespace doris::vectorized { + +class SkewedPartitionRebalancer { +private: + struct TaskBucket { + int task_id; + int id; + + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) + : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {} + + bool operator==(const TaskBucket& other) const { return id == other.id; } + + bool operator<(const TaskBucket& other) const { return id < other.id; } + + bool operator>(const TaskBucket& other) const { return id > other.id; } + }; + +public: + SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold); + + std::vector<std::list<int>> get_partition_assignments(); + int get_task_count(); + int get_task_id(int partition_id, int64_t index); + void add_data_processed(long data_size); + void add_partition_row_count(int partition, long row_count); + void rebalance(); + +private: + void _calculate_partition_data_size(long data_processed); + long _calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions); + void _rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector< + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions); + std::vector<TaskBucket> _find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, + IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets); + bool _rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets); + + bool _should_rebalance(long data_processed); + void _rebalance_partitions(long data_processed); + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/vec/exec/skewed_partition_rebalancer.h:85:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/vec/sink/writer/vhive_partition_writer.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VHivePartitionWriter::VHivePartitionWriter( + const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, + WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) + : _partition_name(std::move(partition_name)), + _update_mode(update_mode), + _vec_output_expr_ctxs(output_expr_ctxs), + _columns(columns), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_format_type(file_format_type), + _hive_compress_type(hive_compress_type), + _hadoop_conf(hadoop_conf) + +{} + +Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { Review Comment: warning: function 'open' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { ^ ``` <details> <summary>Additional context</summary> **be/src/vec/sink/writer/vhive_partition_writer.cpp:47:** 88 lines including whitespace and comments (threshold 80) ```cpp Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { ^ ``` </details> ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class HiveTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> { +public: + using Base = DataSinkOperatorX<HiveTableSinkLocalState>; + HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { Review Comment: warning: method 'init' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status init(const TDataSink& thrift_sink) override { ``` ########## be/src/pipeline/exec/hive_table_sink_operator.h: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class HiveTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> { +public: + using Base = DataSinkOperatorX<HiveTableSinkLocalState>; + HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { Review Comment: warning: method 'open' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status open(RuntimeState* state) override { ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { Review Comment: warning: method 'get_task_count' can be made const [readability-make-member-function-const] ```suggestion int SkewedPartitionRebalancer::get_task_count() const { ``` be/src/vec/exec/skewed_partition_rebalancer.h:79: ```diff - int get_task_count(); + int get_task_count() const; ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); + } + + for (int partition = 0; partition < _partition_count; partition++) { + auto& taskAssignments = _partition_assignments[partition]; + for (const auto& taskBucket : taskAssignments) { + auto& queue = task_bucket_max_partitions[taskBucket.id]; + queue.add_or_update(partition, + _partition_data_size_since_last_rebalance_per_task[partition]); + } + } + + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> + max_task_buckets; + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> + min_task_buckets; + + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = + _calculate_task_bucket_data_size_since_last_rebalance( + task_bucket_max_partitions[task_bucket1.id]); + max_task_buckets.add_or_update( + std::move(task_bucket1), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); + min_task_buckets.add_or_update( + std::move(task_bucket2), Review Comment: warning: std::move of the variable 'task_bucket2' of the trivially-copyable type 'doris::vectorized::SkewedPartitionRebalancer::TaskBucket' has no effect; remove std::move() [performance-move-const-arg] ```suggestion task_bucket2, ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); Review Comment: warning: use emplace_back instead of push_back [modernize-use-emplace] ```suggestion task_bucket_max_partitions.emplace_back(); ``` ########## be/src/vec/exec/skewed_partition_rebalancer.cpp: ########## @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); + } + + for (int partition = 0; partition < _partition_count; partition++) { + auto& taskAssignments = _partition_assignments[partition]; + for (const auto& taskBucket : taskAssignments) { + auto& queue = task_bucket_max_partitions[taskBucket.id]; + queue.add_or_update(partition, + _partition_data_size_since_last_rebalance_per_task[partition]); + } + } + + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> + max_task_buckets; + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> + min_task_buckets; + + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = + _calculate_task_bucket_data_size_since_last_rebalance( + task_bucket_max_partitions[task_bucket1.id]); + max_task_buckets.add_or_update( + std::move(task_bucket1), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); + min_task_buckets.add_or_update( + std::move(task_bucket2), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]); Review Comment: warning: 'task_bucket2' used after it was moved [bugprone-use-after-move] ```cpp _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]); ^ ``` <details> <summary>Additional context</summary> **be/src/vec/exec/skewed_partition_rebalancer.cpp:292:** move occurred here ```cpp std::move(task_bucket2), ^ ``` **be/src/vec/exec/skewed_partition_rebalancer.cpp:293:** the use and move are unsequenced, i.e. there is no guarantee about the order in which they are evaluated ```cpp _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]); ^ ``` </details> ########## be/src/vec/sink/vhive_table_sink.cpp: ########## @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vhive_table_sink.h" + +namespace doris { +class TExpr; + +namespace vectorized { + +VHiveTableSink::VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector<TExpr>& texprs) + : AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK>(row_desc, texprs), _pool(pool) {} + +VHiveTableSink::~VHiveTableSink() = default; + +Status VHiveTableSink::init(const TDataSink& t_sink) { Review Comment: warning: method 'init' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/vhive_table_sink.h:40: ```diff - Status init(const TDataSink& sink) override; + static Status init(const TDataSink& sink) override; ``` ########## be/src/vec/sink/writer/vhive_partition_writer.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::vectorized { ``` be/src/vec/sink/writer/vhive_partition_writer.cpp:280: ```diff - } // namespace vectorized - } // namespace doris + } // namespace doris ``` ########## be/src/vec/sink/writer/vhive_partition_writer.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VHivePartitionWriter::VHivePartitionWriter( + const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, + WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) + : _partition_name(std::move(partition_name)), + _update_mode(update_mode), + _vec_output_expr_ctxs(output_expr_ctxs), + _columns(columns), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_format_type(file_format_type), + _hive_compress_type(hive_compress_type), + _hadoop_conf(hadoop_conf) + +{} + +Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + std::vector<TNetworkAddress> broker_addresses; + RETURN_IF_ERROR(FileFactory::create_file_writer( + _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, + fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer_impl)); + + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + bool parquet_disable_dictionary = false; + TParquetCompressionType::type parquet_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + parquet_compression_type = TParquetCompressionType::SNAPPY; + break; + } + case TFileCompressType::ZSTD: { + parquet_compression_type = TParquetCompressionType::ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported hive compress type {} with parquet", + to_string(_hive_compress_type)); + } + } + std::vector<TParquetSchema> parquet_schemas; + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + TParquetSchema parquet_schema; + parquet_schema.schema_column_name = _columns[i].name; + parquet_schemas.emplace_back(std::move(parquet_schema)); + } + _vfile_writer.reset(new VParquetTransformer( + state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas, + parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, + false)); + return _vfile_writer->open(); + } + case TFileFormatType::FORMAT_ORC: { + orc::CompressionKind orc_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + orc_compression_type = orc::CompressionKind::CompressionKind_NONE; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; + break; + } + case TFileCompressType::ZLIB: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + break; + } + case TFileCompressType::ZSTD: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); + } + } + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + + std::unique_ptr<orc::Type> root_schema = orc::createStructType(); + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + try { + root_schema->addStructField(_columns[i].name, _build_orc_type(column_expr->type())); + } catch (doris::Exception& e) { + return e.to_status(); + } + } + + _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(), + _vec_output_expr_ctxs, std::move(root_schema), + false, orc_compression_type)); + return _vfile_writer->open(); + } + default: { + return Status::InternalError("Unsupported file format type {}", + to_string(_file_format_type)); + } + } +} + +Status VHivePartitionWriter::close(Status status) { Review Comment: warning: method 'close' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/writer/vhive_partition_writer.h:58: ```diff - Status close(Status); + static Status close(Status); ``` ########## be/src/vec/sink/writer/vhive_partition_writer.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VHivePartitionWriter::VHivePartitionWriter( + const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, + WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) + : _partition_name(std::move(partition_name)), + _update_mode(update_mode), + _vec_output_expr_ctxs(output_expr_ctxs), + _columns(columns), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_format_type(file_format_type), + _hive_compress_type(hive_compress_type), + _hadoop_conf(hadoop_conf) + +{} + +Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + std::vector<TNetworkAddress> broker_addresses; + RETURN_IF_ERROR(FileFactory::create_file_writer( + _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, + fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer_impl)); + + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + bool parquet_disable_dictionary = false; + TParquetCompressionType::type parquet_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + parquet_compression_type = TParquetCompressionType::SNAPPY; + break; + } + case TFileCompressType::ZSTD: { + parquet_compression_type = TParquetCompressionType::ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported hive compress type {} with parquet", + to_string(_hive_compress_type)); + } + } + std::vector<TParquetSchema> parquet_schemas; + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + TParquetSchema parquet_schema; + parquet_schema.schema_column_name = _columns[i].name; + parquet_schemas.emplace_back(std::move(parquet_schema)); + } + _vfile_writer.reset(new VParquetTransformer( + state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas, + parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, + false)); + return _vfile_writer->open(); + } + case TFileFormatType::FORMAT_ORC: { + orc::CompressionKind orc_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + orc_compression_type = orc::CompressionKind::CompressionKind_NONE; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; + break; + } + case TFileCompressType::ZLIB: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + break; + } + case TFileCompressType::ZSTD: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); + } + } + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + + std::unique_ptr<orc::Type> root_schema = orc::createStructType(); + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + try { + root_schema->addStructField(_columns[i].name, _build_orc_type(column_expr->type())); + } catch (doris::Exception& e) { + return e.to_status(); + } + } + + _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(), + _vec_output_expr_ctxs, std::move(root_schema), + false, orc_compression_type)); + return _vfile_writer->open(); + } + default: { + return Status::InternalError("Unsupported file format type {}", + to_string(_file_format_type)); + } + } +} + +Status VHivePartitionWriter::close(Status status) { + if (_vfile_writer != nullptr) { + Status st = _vfile_writer->close(); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: {}", st.to_string()); + } + } + if (status != Status::OK()) { + auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); + Status st = _file_writer_impl->fs()->delete_file(path); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); + } + } + _state->hive_partition_updates().emplace_back(_build_partition_update()); + return Status::OK(); +} + +Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { + Block output_block; + RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); + RETURN_IF_ERROR(_vfile_writer->write(output_block)); + _row_count += output_block.rows(); + _input_size_in_bytes += output_block.bytes(); + return Status::OK(); +} + +std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type( + const TypeDescriptor& type_descriptor) { + std::pair<Status, std::unique_ptr<orc::Type>> result; + switch (type_descriptor.type) { + case TYPE_BOOLEAN: { + return orc::createPrimitiveType(orc::BOOLEAN); + } + case TYPE_TINYINT: { + return orc::createPrimitiveType(orc::BYTE); + } + case TYPE_SMALLINT: { + return orc::createPrimitiveType(orc::SHORT); + } + case TYPE_INT: { + return orc::createPrimitiveType(orc::INT); + } + case TYPE_BIGINT: { + return orc::createPrimitiveType(orc::LONG); + } + case TYPE_FLOAT: { + return orc::createPrimitiveType(orc::FLOAT); + } + case TYPE_DOUBLE: { + return orc::createPrimitiveType(orc::DOUBLE); + } + case TYPE_CHAR: { + return orc::createCharType(orc::CHAR, type_descriptor.len); + } + case TYPE_VARCHAR: { + return orc::createCharType(orc::VARCHAR, type_descriptor.len); + } + case TYPE_STRING: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_BINARY: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_DATEV2: { + return orc::createPrimitiveType(orc::DATE); + } + case TYPE_DATETIMEV2: { + return orc::createPrimitiveType(orc::TIMESTAMP); + } + case TYPE_DECIMAL32: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL64: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL128I: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_STRUCT: { + std::unique_ptr<orc::Type> struct_type = orc::createStructType(); + for (int j = 0; j < type_descriptor.children.size(); ++j) { + struct_type->addStructField(type_descriptor.field_names[j], + _build_orc_type(type_descriptor.children[j])); + } + return struct_type; + } + case TYPE_ARRAY: { + return orc::createListType(_build_orc_type(type_descriptor.children[0])); + } + case TYPE_MAP: { + return orc::createMapType(_build_orc_type(type_descriptor.children[0]), + _build_orc_type(type_descriptor.children[1])); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} to build orc type", + type_descriptor.debug_string()); + } + } +} + +Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, Review Comment: warning: method '_projection_and_filter_block' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org