This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ef2151ae667 [Feature-WIP](multi-catalog) Add Hive sink on BE side. (#32306) (#32364) ef2151ae667 is described below commit ef2151ae667a923cae8c28d82be50d377430c3e0 Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Mar 18 11:23:01 2024 +0800 [Feature-WIP](multi-catalog) Add Hive sink on BE side. (#32306) (#32364) bp #32306 Co-authored-by: Qi Chen <kaka11.c...@gmail.com> --- be/src/common/config.cpp | 15 + be/src/common/config.h | 13 + be/src/exec/data_sink.cpp | 16 + be/src/pipeline/exec/exchange_sink_operator.cpp | 73 +++- be/src/pipeline/exec/exchange_sink_operator.h | 28 ++ be/src/pipeline/exec/hive_table_sink_operator.cpp | 50 +++ be/src/pipeline/exec/hive_table_sink_operator.h | 115 ++++++ be/src/pipeline/pipeline_fragment_context.cpp | 11 +- be/src/pipeline/pipeline_x/operator.cpp | 3 + .../pipeline_x/pipeline_x_fragment_context.cpp | 9 + be/src/runtime/fragment_mgr.cpp | 18 + be/src/runtime/runtime_state.h | 4 + be/src/util/indexed_priority_queue.hpp | 182 +++++++++ be/src/vec/exec/skewed_partition_rebalancer.cpp | 302 ++++++++++++++ be/src/vec/exec/skewed_partition_rebalancer.h | 132 +++++++ be/src/vec/runtime/vorc_transformer.cpp | 23 +- be/src/vec/runtime/vorc_transformer.h | 6 +- .../sink/scale_writer_partitioning_exchanger.hpp | 92 +++++ be/src/vec/sink/vhive_table_sink.cpp | 48 +++ be/src/vec/sink/vhive_table_sink.h | 52 +++ be/src/vec/sink/writer/vhive_partition_writer.cpp | 282 ++++++++++++++ be/src/vec/sink/writer/vhive_partition_writer.h | 99 +++++ be/src/vec/sink/writer/vhive_table_writer.cpp | 432 +++++++++++++++++++++ be/src/vec/sink/writer/vhive_table_writer.h | 74 ++++ be/src/vec/sink/writer/vhive_utils.cpp | 78 ++++ be/src/vec/sink/writer/vhive_utils.h | 45 +++ be/test/util/indexed_priority_queue_test.cpp | 104 +++++ .../vec/exec/skewed_partition_rebalancer_test.cpp | 318 +++++++++++++++ be/test/vec/exec/vhive_utils_test.cpp | 70 ++++ 29 files changed, 2681 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index dc31d2c621e..960d8f1c19a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1197,6 +1197,21 @@ DEFINE_mString(ca_cert_file_paths, "/etc/pki/tls/certs/ca-bundle.crt;/etc/ssl/certs/ca-certificates.crt;" "/etc/ssl/ca-bundle.pem"); +/** Table sink configurations(currently contains only external table types) **/ +// Minimum data processed to scale writers when non partition writing +DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold, + "125829120"); // 120MB +// Minimum data processed to start rebalancing in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB +// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold, + "209715200"); // 200MB +// Maximum processed partition nums of per writer when partition writing +DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); + +/** Hive sink configurations **/ +DEFINE_mInt64(hive_sink_max_file_size, "1073741824"); // 1GB + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index abb833f42e8..85db11b9e61 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1271,6 +1271,19 @@ DECLARE_String(tmp_file_dir); // the file paths(one or more) of CA cert, splite using ";" aws s3 lib use it to init s3client DECLARE_mString(ca_cert_file_paths); +/** Table sink configurations(currently contains only external table types) **/ +// Minimum data processed to scale writers when non partition writing +DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold); +// Minimum data processed to start rebalancing in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_data_processed_threshold); +// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold); +// Maximum processed partition nums of per writer when partition writing +DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); + +/** Hive sink configurations **/ +DECLARE_mInt64(hive_sink_max_file_size); // 1GB + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 5047ae8fc78..d373ddd2fcd 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -36,6 +36,7 @@ #include "vec/sink/group_commit_block_sink.h" #include "vec/sink/multi_cast_data_stream_sink.h" #include "vec/sink/vdata_stream_sender.h" +#include "vec/sink/vhive_table_sink.h" #include "vec/sink/vmemory_scratch_sink.h" #include "vec/sink/volap_table_sink.h" #include "vec/sink/volap_table_sink_v2.h" @@ -157,6 +158,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } break; } + case TDataSinkType::HIVE_TABLE_SINK: { + if (!thrift_sink.__isset.hive_table_sink) { + return Status::InternalError("Missing hive table sink."); + } + sink->reset(new vectorized::VHiveTableSink(pool, row_desc, output_exprs)); + break; + } case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); @@ -298,6 +306,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } break; } + case TDataSinkType::HIVE_TABLE_SINK: { + if (!thrift_sink.__isset.hive_table_sink) { + return Status::InternalError("Missing hive table sink."); + } + sink->reset(new vectorized::VHiveTableSink(pool, row_desc, output_exprs)); + break; + } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { DCHECK(thrift_sink.__isset.multi_cast_stream_sink); DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); @@ -313,6 +328,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + default: { std::stringstream error_msg; std::map<int, const char*>::const_iterator i = diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 333abdbcc01..9c3d68c472f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -157,7 +157,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf local_size++; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { std::random_device rd; std::mt19937 g(rd()); shuffle(channels.begin(), channels.end(), g); @@ -249,6 +250,27 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf .schema = _schema, .caller = (void*)this, .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); + } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + _partition_count = + channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; + _partitioner.reset( + new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count)); + _partition_function.reset(new HashPartitionFunction(_partitioner.get())); + // const long MEGABYTE = 1024 * 1024; + // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB + // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB + + // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB + // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB + scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< + HashPartitionFunction>( + channels.size(), *_partition_function, _partition_count, channels.size(), 1, + config::table_sink_partition_write_data_processed_threshold, + config::table_sink_partition_write_skewed_data_processed_rebalance_threshold)); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); + RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); } _finish_dependency->block(); @@ -259,7 +281,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf Status ExchangeSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); if (_part_type == TPartitionType::HASH_PARTITIONED || - _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || + _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { RETURN_IF_ERROR(_partitioner->open(state)); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); @@ -320,7 +343,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || - sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); + sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || + sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED || + sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared<ObjectPool>(); } @@ -492,7 +517,47 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, channel2rows, convert_block.get(), eos)); + } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + { + SCOPED_TIMER(local_state._split_block_hash_compute_timer); + RETURN_IF_ERROR( + local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + } + std::vector<std::vector<uint32>> assignments = + local_state.scale_writer_partitioning_exchanger->accept(block); + RETURN_IF_ERROR(channel_add_rows_with_idx( + state, local_state.channels, local_state.channels.size(), assignments, block, eos)); + } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + // Control the number of channels according to the flow, thereby controlling the number of table sink writers. + // 1. select channel + vectorized::PipChannel<ExchangeSinkLocalState>* current_channel = + local_state.channels[local_state.current_channel_idx]; + if (!current_channel->is_receiver_eof()) { + // 2. serialize, send and rollover block + if (current_channel->is_local()) { + auto status = current_channel->send_local_block(block); + HANDLE_CHANNEL_STATUS(state, current_channel, status); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(local_state._serializer.serialize_block( + block, current_channel->ch_cur_pb_block())); + auto status = + current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); + HANDLE_CHANNEL_STATUS(state, current_channel, status); + current_channel->ch_roll_pb_block(); + } + _data_processed += block->bytes(); + } + + if (_writer_count < local_state.channels.size()) { + if (_data_processed >= + _writer_count * + config::table_sink_non_partition_write_scaling_data_processed_threshold) { + _writer_count++; + } + } + local_state.current_channel_idx = (local_state.current_channel_idx + 1) % _writer_count; } else { // Range partition // 1. calculate range @@ -581,7 +646,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( channel2rows[i].clear(); } } - if (eos) { for (int i = 0; i < num_channels; ++i) { if (!channels[i]->is_receiver_eof()) { @@ -590,7 +654,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( } } } - return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 9384adbe5f3..17878fc6ead 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -25,6 +25,7 @@ #include "exchange_sink_buffer.h" #include "operator.h" #include "pipeline/pipeline_x/operator.h" +#include "vec/sink/scale_writer_partitioning_exchanger.hpp" #include "vec/sink/vdata_stream_sender.h" namespace doris { @@ -68,6 +69,21 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState<>; +private: + class HashPartitionFunction { + public: + HashPartitionFunction(vectorized::PartitionerBase* partitioner) + : _partitioner(partitioner) {} + + int get_partition(vectorized::Block* block, int position) { + uint32_t* partition_ids = (uint32_t*)_partitioner->get_channel_ids(); + return partition_ids[position]; + } + + private: + vectorized::PartitionerBase* _partitioner; + }; + public: ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), @@ -132,6 +148,10 @@ public: int current_channel_idx; // index of current channel to send to if _random == true bool only_local_exchange; + // for external table sink hash partition + std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>> + scale_writer_partitioning_exchanger; + private: friend class ExchangeSinkOperatorX; friend class vectorized::Channel<ExchangeSinkLocalState>; @@ -209,6 +229,9 @@ private: std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids; int64_t _number_input_rows = 0; TPartitionType::type _part_type; + + // for external table sink hash partition + std::unique_ptr<HashPartitionFunction> _partition_function = nullptr; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> { @@ -274,6 +297,11 @@ private: const TTupleId& _tablet_sink_tuple_id; int64_t _tablet_sink_txn_id = -1; std::shared_ptr<ObjectPool> _pool; + + // for external table sink random partition + // Control the number of channels according to the flow, thereby controlling the number of table sink writers. + size_t _data_processed = 0; + int _writer_count = 1; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hive_table_sink_operator.cpp b/be/src/pipeline/exec/hive_table_sink_operator.cpp new file mode 100644 index 00000000000..6b8eaa8c91e --- /dev/null +++ b/be/src/pipeline/exec/hive_table_sink_operator.cpp @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hive_table_sink_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { + +OperatorPtr HiveTableSinkOperatorBuilder::build_operator() { + return std::make_shared<HiveTableSinkOperator>(this, _sink); +} + +Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast<Parent>(); + RETURN_IF_ERROR(_writer->init_properties(p._pool)); + return Status::OK(); +} + +Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (Base::_closed) { + return Status::OK(); + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(exec_time_counter()); + if (_closed) { + return _close_status; + } + _close_status = Base::close(state, exec_status); + return _close_status; +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h b/be/src/pipeline/exec/hive_table_sink_operator.h new file mode 100644 index 00000000000..39b5df36567 --- /dev/null +++ b/be/src/pipeline/exec/hive_table_sink_operator.h @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vhive_table_sink.h" + +namespace doris { + +namespace pipeline { + +class HiveTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class HiveTableSinkOperator final : public DataSinkOperator<vectorized::VHiveTableSink> { +public: + HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return _sink->can_write(); } +}; + +class HiveTableSinkOperatorX; + +class HiveTableSinkLocalState final + : public AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX> { +public: + using Base = AsyncWriterSink<vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; + using Parent = HiveTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState); + HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class HiveTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> { +public: + using Base = DataSinkOperatorX<HiveTableSinkLocalState>; + HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, eos); + } + +private: + friend class HiveTableSinkLocalState; + template <typename Writer, typename Parent> + requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) + friend class AsyncWriterSink; + const RowDescriptor& _row_desc; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + const std::vector<TExpr>& _t_output_expr; + ObjectPool* _pool = nullptr; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d1800299559..c0e075d2bfa 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -59,6 +59,7 @@ #include "pipeline/exec/group_commit_block_sink_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep @@ -821,12 +822,14 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr _sink.get()); break; } - case TDataSinkType::MYSQL_TABLE_SINK: - case TDataSinkType::JDBC_TABLE_SINK: - case TDataSinkType::ODBC_TABLE_SINK: { - sink_ = std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), _sink.get()); + case TDataSinkType::HIVE_TABLE_SINK: { + sink_ = std::make_shared<HiveTableSinkOperatorBuilder>(next_operator_builder_id(), + _sink.get()); break; } + case TDataSinkType::MYSQL_TABLE_SINK: + case TDataSinkType::JDBC_TABLE_SINK: + case TDataSinkType::ODBC_TABLE_SINK: case TDataSinkType::RESULT_FILE_SINK: { sink_ = std::make_shared<ResultFileSinkOperatorBuilder>( thrift_sink.result_file_sink.dest_node_id, _sink.get()); diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 0c890b83041..f9a30c7e3ac 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -37,6 +37,7 @@ #include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" @@ -541,6 +542,7 @@ DECLARE_OPERATOR_X(JdbcTableSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) +DECLARE_OPERATOR_X(HiveTableSinkLocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(SpillSortSinkLocalState) @@ -637,5 +639,6 @@ template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileS template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>; +template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 58fc3cb843b..13ad0789e31 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -51,6 +51,7 @@ #include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" @@ -372,6 +373,14 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } break; } + case TDataSinkType::HIVE_TABLE_SINK: { + if (!thrift_sink.__isset.hive_table_sink) { + return Status::InternalError("Missing hive table sink."); + } + _sink.reset( + new HiveTableSinkOperatorX(pool, next_sink_operator_id(), row_desc, output_exprs)); + break; + } case TDataSinkType::JDBC_TABLE_SINK: { if (!thrift_sink.__isset.jdbc_table_sink) { return Status::InternalError("Missing data jdbc sink."); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f6552f67fca..4657c341e81 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -373,6 +373,24 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } } + if (!req.runtime_state->hive_partition_updates().empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.reserve( + req.runtime_state->hive_partition_updates().size()); + for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) { + params.hive_partition_updates.push_back(hive_partition_update); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->hive_partition_updates().empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.insert(params.hive_partition_updates.end(), + rs->hive_partition_updates().begin(), + rs->hive_partition_updates().end()); + } + } + } + // Send new errors to coordinator req.runtime_state->get_unreported_errors(&(params.error_log)); params.__isset.error_log = (params.error_log.size() > 0); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e14721d4859..d031f36fbed 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -452,6 +452,8 @@ public: std::vector<TTabletCommitInfo>& tablet_commit_infos() { return _tablet_commit_infos; } + std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; } + const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return _error_tablet_infos; } std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; } @@ -726,6 +728,8 @@ private: int _max_operator_id = 0; int _task_id = -1; + std::vector<THivePartitionUpdate> _hive_partition_updates; + std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>> _op_id_to_local_state; std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> _sink_local_state; diff --git a/be/src/util/indexed_priority_queue.hpp b/be/src/util/indexed_priority_queue.hpp new file mode 100644 index 00000000000..5869d575231 --- /dev/null +++ b/be/src/util/indexed_priority_queue.hpp @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +// to cpp and modified by Doris + +#pragma once + +#pragma once + +#include <functional> +#include <iostream> +#include <map> +#include <optional> +#include <set> + +/** + * A priority queue with constant time contains(E) and log time remove(E) + * Ties are broken by insertion order. + * LOW_TO_HIGH is the priority order from low to high, + * HIGH_TO_LOW is the priority order from high to low. + * Those with the same priority are arranged in order of insertion. + */ + +namespace doris { + +template <typename T> +struct IndexedPriorityQueueEntry { + T value; + long priority; + long generation; + + IndexedPriorityQueueEntry(T val, long prio, long gen) + : value(std::move(val)), priority(prio), generation(gen) {} +}; + +enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW }; + +template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering> +struct IndexedPriorityQueueComparator { + bool operator()(const IndexedPriorityQueueEntry<T>& lhs, + const IndexedPriorityQueueEntry<T>& rhs) const { + if constexpr (priority_ordering == IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) { + if (lhs.priority != rhs.priority) { + return lhs.priority < rhs.priority; + } + return lhs.generation < rhs.generation; + } else { + if (lhs.priority != rhs.priority) { + return lhs.priority > rhs.priority; + } + return lhs.generation < rhs.generation; + } + } +}; + +template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering = + IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> +class IndexedPriorityQueue { +public: + struct Prioritized { + T value; + long priority; + }; + + IndexedPriorityQueue() = default; + + bool add_or_update(T element, long priority) { + auto it = _index.find(element); + if (it != _index.end()) { + if (it->second.priority == priority) { + return false; + } + _queue.erase(it->second); + } + IndexedPriorityQueueEntry<T> entry {std::move(element), priority, generation++}; + _queue.insert(std::move(entry)); + _index.insert({entry.value, std::move(entry)}); + return true; + } + + bool contains(const T& element) const { return _index.find(element) != _index.end(); } + + bool remove(const T& element) { + auto it = _index.find(element); + if (it != _index.end()) { + _queue.erase(it->second); + _index.erase(it); + return true; + } + return false; + } + + std::optional<T> poll() { + if (_queue.empty()) { + return std::nullopt; + } + T value = _queue.begin()->value; + _index.erase(value); + _queue.erase(_queue.begin()); + return value; + } + + std::optional<Prioritized> peek() const { + if (_queue.empty()) { + return std::nullopt; + } + const IndexedPriorityQueueEntry<T>& entry = *_queue.begin(); + return Prioritized {entry.value, entry.priority}; + } + + int size() const { return _queue.size(); } + + bool is_empty() const { return _queue.empty(); } + + class Iterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using difference_type = std::ptrdiff_t; + using pointer = T*; + using reference = T&; + + Iterator() : _iter() {} + explicit Iterator( + typename std::set< + IndexedPriorityQueueEntry<T>, + IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator iter) + : _iter(iter) {} + + const T& operator*() const { return _iter->value; } + + const T* operator->() const { return &(_iter->value); } + + Iterator& operator++() { + ++_iter; + return *this; + } + + Iterator operator++(int) { + Iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const Iterator& other) const { return _iter == other._iter; } + + bool operator!=(const Iterator& other) const { return !(*this == other); } + + private: + typename std::set<IndexedPriorityQueueEntry<T>, + IndexedPriorityQueueComparator<T, priority_ordering>>::const_iterator + _iter; + }; + + Iterator begin() const { return Iterator(_queue.begin()); } + + Iterator end() const { return Iterator(_queue.end()); } + +private: + std::map<T, IndexedPriorityQueueEntry<T>> _index; + std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T, priority_ordering>> + _queue; + + long generation = 0; +}; + +} // namespace doris diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp b/be/src/vec/exec/skewed_partition_rebalancer.cpp new file mode 100644 index 00000000000..ae12d365f05 --- /dev/null +++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <cmath> +#include <list> + +namespace doris::vectorized { + +SkewedPartitionRebalancer::SkewedPartitionRebalancer( + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _partition_count(partition_count), + _task_count(task_count), + _task_bucket_count(task_bucket_count), + _min_partition_data_processed_rebalance_threshold( + min_partition_data_processed_rebalance_threshold), + _min_data_processed_rebalance_threshold( + std::max(min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold)), + _partition_row_count(partition_count, 0), + _data_processed(0), + _data_processed_at_last_rebalance(0), + _partition_data_size(partition_count, 0), + _partition_data_size_at_last_rebalance(partition_count, 0), + _partition_data_size_since_last_rebalance_per_task(partition_count, 0), + _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), + _partition_assignments(partition_count) { + std::vector<int> task_bucket_ids(task_count, 0); + + for (int partition = 0; partition < partition_count; partition++) { + int task_id = partition % task_count; + int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; + TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + _partition_assignments[partition].emplace_back(std::move(task_bucket)); + } +} + +std::vector<std::list<int>> SkewedPartitionRebalancer::get_partition_assignments() { + std::vector<std::list<int>> assigned_tasks; + + for (const auto& partition_assignment : _partition_assignments) { + std::list<int> tasks; + std::transform(partition_assignment.begin(), partition_assignment.end(), + std::back_inserter(tasks), + [](const TaskBucket& task_bucket) { return task_bucket.task_id; }); + assigned_tasks.push_back(tasks); + } + + return assigned_tasks; +} + +int SkewedPartitionRebalancer::get_task_count() { + return _task_count; +} + +int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) { + const std::vector<TaskBucket>& task_ids = _partition_assignments[partition_id]; + + int task_id_index = (index % task_ids.size() + task_ids.size()) % task_ids.size(); + + return task_ids[task_id_index].task_id; +} + +void SkewedPartitionRebalancer::add_data_processed(long data_size) { + _data_processed += data_size; +} + +void SkewedPartitionRebalancer::add_partition_row_count(int partition, long row_count) { + _partition_row_count[partition] += row_count; +} + +void SkewedPartitionRebalancer::rebalance() { + long current_data_processed = _data_processed; + if (_should_rebalance(current_data_processed)) { + _rebalance_partitions(current_data_processed); + } +} + +void SkewedPartitionRebalancer::_calculate_partition_data_size(long data_processed) { + long total_partition_row_count = 0; + for (int partition = 0; partition < _partition_count; partition++) { + total_partition_row_count += _partition_row_count[partition]; + } + + for (int partition = 0; partition < _partition_count; partition++) { + _partition_data_size[partition] = std::max( + (_partition_row_count[partition] * data_processed) / total_partition_row_count, + _partition_data_size[partition]); + } +} + +long SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions) { + long estimated_data_size_since_last_rebalance = 0; + for (auto& elem : max_partitions) { + estimated_data_size_since_last_rebalance += + _partition_data_size_since_last_rebalance_per_task[elem]; + } + return estimated_data_size_since_last_rebalance; +} + +void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions) { + std::vector<int> scaled_partitions; + while (true) { + std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll(); + if (!max_task_bucket.has_value()) { + break; + } + + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions = task_bucket_max_partitions[max_task_bucket->id]; + if (max_partitions.is_empty()) { + continue; + } + + std::vector<TaskBucket> min_skewed_task_buckets = + _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); + if (min_skewed_task_buckets.empty()) { + break; + } + + while (true) { + std::optional<int> max_partition = max_partitions.poll(); + if (!max_partition.has_value()) { + break; + } + int max_partition_value = max_partition.value(); + + if (std::find(scaled_partitions.begin(), scaled_partitions.end(), + max_partition_value) != scaled_partitions.end()) { + continue; + } + + int total_assigned_tasks = _partition_assignments[max_partition_value].size(); + if (_partition_data_size[max_partition_value] >= + (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { + for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { + if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + break; + } + } + } else { + break; + } + } + } +} + +std::vector<SkewedPartitionRebalancer::TaskBucket> +SkewedPartitionRebalancer::_find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket> min_skewed_task_buckets; + + for (const auto& min_task_bucket : min_task_buckets) { + double skewness = + static_cast<double>( + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] - + _estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) / + _estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || std::isnan(skewness)) { + break; + } + if (max_task_bucket.task_id != min_task_bucket.task_id) { + min_skewed_task_buckets.push_back(min_task_bucket); + } + } + return min_skewed_task_buckets; +} + +bool SkewedPartitionRebalancer::_rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets) { + std::vector<TaskBucket>& assignments = _partition_assignments[partition_id]; + if (std::any_of(assignments.begin(), assignments.end(), + [&to_task_bucket](const TaskBucket& task_bucket) { + return task_bucket.task_id == to_task_bucket.task_id; + })) { + return false; + } + + assignments.push_back(to_task_bucket); + + int new_task_count = assignments.size(); + int old_task_count = new_task_count - 1; + for (const TaskBucket& task_bucket : assignments) { + if (task_bucket == to_task_bucket) { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] += + (_partition_data_size_since_last_rebalance_per_task[partition_id] * + old_task_count) / + new_task_count; + } else { + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -= + _partition_data_size_since_last_rebalance_per_task[partition_id] / + new_task_count; + } + max_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + min_task_buckets.add_or_update( + task_bucket, _estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]); + } + + return true; +} + +bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) { + return (data_processed - _data_processed_at_last_rebalance) >= + _min_data_processed_rebalance_threshold; +} + +void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { + if (!_should_rebalance(data_processed)) { + return; + } + + _calculate_partition_data_size(data_processed); + + for (int partition = 0; partition < _partition_count; partition++) { + int total_assigned_tasks = _partition_assignments[partition].size(); + long data_size = _partition_data_size[partition]; + _partition_data_size_since_last_rebalance_per_task[partition] = + (data_size - _partition_data_size_at_last_rebalance[partition]) / + total_assigned_tasks; + _partition_data_size_at_last_rebalance[partition] = data_size; + } + + std::vector<IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>> + task_bucket_max_partitions; + + for (int i = 0; i < _task_count * _task_bucket_count; ++i) { + task_bucket_max_partitions.push_back( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>()); + } + + for (int partition = 0; partition < _partition_count; partition++) { + auto& taskAssignments = _partition_assignments[partition]; + for (const auto& taskBucket : taskAssignments) { + auto& queue = task_bucket_max_partitions[taskBucket.id]; + queue.add_or_update(partition, + _partition_data_size_since_last_rebalance_per_task[partition]); + } + } + + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> + max_task_buckets; + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> + min_task_buckets; + + for (int taskId = 0; taskId < _task_count; taskId++) { + for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { + TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); + TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = + _calculate_task_bucket_data_size_since_last_rebalance( + task_bucket_max_partitions[task_bucket1.id]); + max_task_buckets.add_or_update( + std::move(task_bucket1), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]); + min_task_buckets.add_or_update( + std::move(task_bucket2), + _estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]); + } + } + + _rebalance_based_on_task_bucket_skewness(max_task_buckets, min_task_buckets, + task_bucket_max_partitions); + _data_processed_at_last_rebalance = data_processed; +} +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h b/be/src/vec/exec/skewed_partition_rebalancer.h new file mode 100644 index 00000000000..814ebc1d465 --- /dev/null +++ b/be/src/vec/exec/skewed_partition_rebalancer.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +// to cpp and modified by Doris + +/** + * Helps in distributing big or skewed partitions across available tasks to improve the performance of + * partitioned writes. + * <p> + * This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to + * uniformly distribute partitions across those buckets. This helps to mitigate two problems: + * 1. Mitigate skewness across tasks. + * 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the + * local scaling without impacting much overall resource utilization. + * <p> + * Example: + * <p> + * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 + * Bucket2 Bucket2 Bucket2 + * Bucket3 Bucket3 Bucket3 + * <p> + * After rebalancing: + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1) + * Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2) + * Bucket3 Bucket3 Bucket3 + */ + +#pragma once + +#include <algorithm> +#include <iostream> +#include <list> +#include <optional> +#include <vector> + +#include "util/indexed_priority_queue.hpp" + +namespace doris::vectorized { + +class SkewedPartitionRebalancer { +private: + struct TaskBucket { + int task_id; + int id; + + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) + : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {} + + bool operator==(const TaskBucket& other) const { return id == other.id; } + + bool operator<(const TaskBucket& other) const { return id < other.id; } + + bool operator>(const TaskBucket& other) const { return id > other.id; } + }; + +public: + SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold); + + std::vector<std::list<int>> get_partition_assignments(); + int get_task_count(); + int get_task_id(int partition_id, int64_t index); + void add_data_processed(long data_size); + void add_partition_row_count(int partition, long row_count); + void rebalance(); + +private: + void _calculate_partition_data_size(long data_processed); + long _calculate_task_bucket_data_size_since_last_rebalance( + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_partitions); + void _rebalance_based_on_task_bucket_skewness( + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets, + std::vector< + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>& + task_bucket_max_partitions); + std::vector<TaskBucket> _find_skewed_min_task_buckets( + const TaskBucket& max_task_bucket, + const IndexedPriorityQueue<TaskBucket, + IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets); + bool _rebalance_partition( + int partition_id, const TaskBucket& to_task_bucket, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>& + max_task_buckets, + IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>& + min_task_buckets); + + bool _should_rebalance(long data_processed); + void _rebalance_partitions(long data_processed); + +private: + static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; + + int _partition_count; + int _task_count; + int _task_bucket_count; + long _min_partition_data_processed_rebalance_threshold; + long _min_data_processed_rebalance_threshold; + std::vector<long> _partition_row_count; + long _data_processed; + long _data_processed_at_last_rebalance; + std::vector<long> _partition_data_size; + std::vector<long> _partition_data_size_at_last_rebalance; + std::vector<long> _partition_data_size_since_last_rebalance_per_task; + std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance; + + std::vector<std::vector<TaskBucket>> _partition_assignments; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index be52b672aa8..764a97ae5bc 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -99,16 +99,33 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* fil : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), _file_writer(file_writer), _write_options(new orc::WriterOptions()), - _schema_str(schema) { + _schema_str(&schema), + _schema(nullptr) { _write_options->setTimezoneName(_state->timezone()); _write_options->setUseTightNumericVector(true); } +VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + std::unique_ptr<orc::Type> schema, bool output_object_data, + orc::CompressionKind compression) + : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), + _file_writer(file_writer), + _write_options(new orc::WriterOptions()), + _schema_str(nullptr), + _schema(std::move(schema)) { + _write_options->setTimezoneName(_state->timezone()); + _write_options->setUseTightNumericVector(true); + _write_options->setCompression(compression); +} + Status VOrcTransformer::open() { try { - _schema = orc::Type::buildTypeFromString(_schema_str); + if (_schema == nullptr && _schema_str != nullptr) { + _schema = orc::Type::buildTypeFromString(*_schema_str); + } } catch (const std::exception& e) { - return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str, + return Status::InternalError("Orc build schema from \"{}\" failed: {}", *_schema_str, e.what()); } _output_stream = std::make_unique<VOrcOutputStream>(_file_writer); diff --git a/be/src/vec/runtime/vorc_transformer.h b/be/src/vec/runtime/vorc_transformer.h index 4b8ea178ca9..8cfc956c0cd 100644 --- a/be/src/vec/runtime/vorc_transformer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -78,6 +78,10 @@ public: const VExprContextSPtrs& output_vexpr_ctxs, const std::string& schema, bool output_object_data); + VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, std::unique_ptr<orc::Type> schema, + bool output_object_data, orc::CompressionKind compression); + ~VOrcTransformer() = default; Status open() override; @@ -99,7 +103,7 @@ private: doris::io::FileWriter* _file_writer = nullptr; std::unique_ptr<orc::OutputStream> _output_stream; std::unique_ptr<orc::WriterOptions> _write_options; - const std::string& _schema_str; + const std::string* _schema_str; std::unique_ptr<orc::Type> _schema; std::unique_ptr<orc::Writer> _writer; diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp new file mode 100644 index 00000000000..f7435249c20 --- /dev/null +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <functional> +#include <iostream> +#include <vector> + +#include "vec/core/block.h" +#include "vec/exec/skewed_partition_rebalancer.h" + +namespace doris::vectorized { + +template <typename PartitionFunction> +class ScaleWriterPartitioningExchanger { +public: + ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function, + int partition_count, int task_count, int task_bucket_count, + long min_partition_data_processed_rebalance_threshold, + long min_data_processed_rebalance_threshold) + : _channel_size(channel_size), + _partition_function(partition_function), + _partition_rebalancer(partition_count, task_count, task_bucket_count, + min_partition_data_processed_rebalance_threshold, + min_data_processed_rebalance_threshold), + _partition_row_counts(partition_count, 0), + _partition_writer_ids(partition_count, -1), + _partition_writer_indexes(partition_count, 0) {} + + std::vector<std::vector<uint32_t>> accept(Block* block) { + std::vector<std::vector<uint32_t>> writerAssignments(_channel_size, + std::vector<uint32_t>()); + for (int partition_id = 0; partition_id < _partition_row_counts.size(); partition_id++) { + _partition_row_counts[partition_id] = 0; + _partition_writer_ids[partition_id] = -1; + } + + _partition_rebalancer.rebalance(); + + for (int position = 0; position < block->rows(); position++) { + int partition_id = _partition_function.get_partition(block, position); + _partition_row_counts[partition_id] += 1; + + // Get writer id for this partition by looking at the scaling state + int writer_id = _partition_writer_ids[partition_id]; + if (writer_id == -1) { + writer_id = get_next_writer_id(partition_id); + _partition_writer_ids[partition_id] = writer_id; + } + writerAssignments[writer_id].push_back(position); + } + + for (int partition_id = 0; partition_id < _partition_row_counts.size(); partition_id++) { + _partition_rebalancer.add_partition_row_count(partition_id, + _partition_row_counts[partition_id]); + } + _partition_rebalancer.add_data_processed(block->bytes()); + + return writerAssignments; + } + + int get_next_writer_id(int partition_id) { + return _partition_rebalancer.get_task_id(partition_id, + _partition_writer_indexes[partition_id]++); + } + +private: + int _channel_size; + PartitionFunction& _partition_function; + SkewedPartitionRebalancer _partition_rebalancer; + std::vector<int> _partition_row_counts; + std::vector<int> _partition_writer_ids; + std::vector<int> _partition_writer_indexes; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vhive_table_sink.cpp b/be/src/vec/sink/vhive_table_sink.cpp new file mode 100644 index 00000000000..0fba50cef69 --- /dev/null +++ b/be/src/vec/sink/vhive_table_sink.cpp @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vhive_table_sink.h" + +namespace doris { +class TExpr; + +namespace vectorized { + +VHiveTableSink::VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector<TExpr>& texprs) + : AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK>(row_desc, texprs), _pool(pool) {} + +VHiveTableSink::~VHiveTableSink() = default; + +Status VHiveTableSink::init(const TDataSink& t_sink) { + RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); + RETURN_IF_ERROR(_writer->init_properties(_pool)); + return Status::OK(); +} + +Status VHiveTableSink::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(_exec_timer); + if (_closed) { + return _close_status; + } + RETURN_IF_ERROR(DataSink::close(state, exec_status)); + _close_status = AsyncWriterSink::close(state, exec_status); + return _close_status; +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/vhive_table_sink.h b/be/src/vec/sink/vhive_table_sink.h new file mode 100644 index 00000000000..d7b9c3fc856 --- /dev/null +++ b/be/src/vec/sink/vhive_table_sink.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "vec/sink/async_writer_sink.h" +#include "vec/sink/writer/vhive_table_writer.h" + +namespace doris { + +class ObjectPool; +class RowDescriptor; + +namespace vectorized { + +inline constexpr char VHIVE_TABLE_SINK[] = "VHiveTableSink"; + +class VHiveTableSink final : public AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK> { +public: + // Construct from thrift struct which is generated by FE. + VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector<TExpr>& texprs); + + ~VHiveTableSink() override; + + Status init(const TDataSink& sink) override; + + Status close(RuntimeState* state, Status exec_status) override; + +private: + ObjectPool* _pool = nullptr; + + Status _close_status = Status::OK(); +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp new file mode 100644 index 00000000000..38668abb3fc --- /dev/null +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VHivePartitionWriter::VHivePartitionWriter( + const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, + WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) + : _partition_name(std::move(partition_name)), + _update_mode(update_mode), + _vec_output_expr_ctxs(output_expr_ctxs), + _columns(columns), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_format_type(file_format_type), + _hive_compress_type(hive_compress_type), + _hadoop_conf(hadoop_conf) + +{} + +Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + std::vector<TNetworkAddress> broker_addresses; + RETURN_IF_ERROR(FileFactory::create_file_writer( + _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, + fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer_impl)); + + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + bool parquet_disable_dictionary = false; + TParquetCompressionType::type parquet_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + parquet_compression_type = TParquetCompressionType::SNAPPY; + break; + } + case TFileCompressType::ZSTD: { + parquet_compression_type = TParquetCompressionType::ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported hive compress type {} with parquet", + to_string(_hive_compress_type)); + } + } + std::vector<TParquetSchema> parquet_schemas; + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + TParquetSchema parquet_schema; + parquet_schema.schema_column_name = _columns[i].name; + parquet_schemas.emplace_back(std::move(parquet_schema)); + } + _vfile_writer.reset(new VParquetTransformer( + state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas, + parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, + false)); + return _vfile_writer->open(); + } + case TFileFormatType::FORMAT_ORC: { + orc::CompressionKind orc_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + orc_compression_type = orc::CompressionKind::CompressionKind_NONE; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; + break; + } + case TFileCompressType::ZLIB: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + break; + } + case TFileCompressType::ZSTD: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); + } + } + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + + std::unique_ptr<orc::Type> root_schema = orc::createStructType(); + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + try { + root_schema->addStructField(_columns[i].name, _build_orc_type(column_expr->type())); + } catch (doris::Exception& e) { + return e.to_status(); + } + } + + _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(), + _vec_output_expr_ctxs, std::move(root_schema), + false, orc_compression_type)); + return _vfile_writer->open(); + } + default: { + return Status::InternalError("Unsupported file format type {}", + to_string(_file_format_type)); + } + } +} + +Status VHivePartitionWriter::close(Status status) { + if (_vfile_writer != nullptr) { + Status st = _vfile_writer->close(); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: {}", st.to_string()); + } + } + if (status != Status::OK()) { + auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); + Status st = _file_writer_impl->fs()->delete_file(path); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); + } + } + _state->hive_partition_updates().emplace_back(_build_partition_update()); + return Status::OK(); +} + +Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { + Block output_block; + RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); + RETURN_IF_ERROR(_vfile_writer->write(output_block)); + _row_count += output_block.rows(); + _input_size_in_bytes += output_block.bytes(); + return Status::OK(); +} + +std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type( + const TypeDescriptor& type_descriptor) { + std::pair<Status, std::unique_ptr<orc::Type>> result; + switch (type_descriptor.type) { + case TYPE_BOOLEAN: { + return orc::createPrimitiveType(orc::BOOLEAN); + } + case TYPE_TINYINT: { + return orc::createPrimitiveType(orc::BYTE); + } + case TYPE_SMALLINT: { + return orc::createPrimitiveType(orc::SHORT); + } + case TYPE_INT: { + return orc::createPrimitiveType(orc::INT); + } + case TYPE_BIGINT: { + return orc::createPrimitiveType(orc::LONG); + } + case TYPE_FLOAT: { + return orc::createPrimitiveType(orc::FLOAT); + } + case TYPE_DOUBLE: { + return orc::createPrimitiveType(orc::DOUBLE); + } + case TYPE_CHAR: { + return orc::createCharType(orc::CHAR, type_descriptor.len); + } + case TYPE_VARCHAR: { + return orc::createCharType(orc::VARCHAR, type_descriptor.len); + } + case TYPE_STRING: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_BINARY: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_DATEV2: { + return orc::createPrimitiveType(orc::DATE); + } + case TYPE_DATETIMEV2: { + return orc::createPrimitiveType(orc::TIMESTAMP); + } + case TYPE_DECIMAL32: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL64: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL128I: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_STRUCT: { + std::unique_ptr<orc::Type> struct_type = orc::createStructType(); + for (int j = 0; j < type_descriptor.children.size(); ++j) { + struct_type->addStructField(type_descriptor.field_names[j], + _build_orc_type(type_descriptor.children[j])); + } + return struct_type; + } + case TYPE_ARRAY: { + return orc::createListType(_build_orc_type(type_descriptor.children[0])); + } + case TYPE_MAP: { + return orc::createMapType(_build_orc_type(type_descriptor.children[0]), + _build_orc_type(type_descriptor.children[1])); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} to build orc type", + type_descriptor.debug_string()); + } + } +} + +Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block) { + Status status = Status::OK(); + if (input_block.rows() == 0) { + return status; + } + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, input_block, output_block)); + materialize_block_inplace(*output_block); + + if (filter == nullptr) { + return status; + } + + std::vector<uint32_t> columns_to_filter; + int column_to_keep = input_block.columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + + Block::filter_block_internal(output_block, columns_to_filter, *filter); + + return status; +} + +THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { + THivePartitionUpdate hive_partition_update; + hive_partition_update.__set_name(_partition_name); + hive_partition_update.__set_update_mode(_update_mode); + THiveLocationParams location; + location.__set_write_path(_write_info.write_path); + location.__set_target_path(_write_info.target_path); + hive_partition_update.__set_location(location); + hive_partition_update.__set_file_names({_file_name}); + hive_partition_update.__set_row_count(_row_count); + hive_partition_update.__set_file_size(_input_size_in_bytes); + return hive_partition_update; +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h new file mode 100644 index 00000000000..afee81aa499 --- /dev/null +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> + +#include "io/fs/file_writer.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VHivePartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string target_path; + TFileType::type file_type; + }; + + VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + const std::vector<THiveColumn>& columns, WriteInfo write_info, + const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf); + + Status init_properties(ObjectPool* pool) { return Status::OK(); } + + Status open(RuntimeState* state, RuntimeProfile* profile); + + Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr); + + Status close(Status); + + inline size_t written_len() { return _vfile_writer->written_len(); } + +private: + std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& type_descriptor); + + Status _projection_and_filter_block(doris::vectorized::Block& input_block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block); + + THivePartitionUpdate _build_partition_update(); + + std::string _path; + + std::string _partition_name; + + TUpdateMode::type _update_mode; + + size_t _row_count = 0; + size_t _input_size_in_bytes = 0; + + const VExprContextSPtrs& _vec_output_expr_ctxs; + + const std::vector<THiveColumn>& _columns; + WriteInfo _write_info; + std::string _file_name; + TFileFormatType::type _file_format_type; + TFileCompressType::type _hive_compress_type; + const std::map<std::string, std::string>& _hadoop_conf; + + // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. + // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. + std::unique_ptr<doris::io::FileWriter> _file_writer_impl = nullptr; + // convert block to parquet/orc/csv format + std::unique_ptr<VFileFormatTransformer> _vfile_writer = nullptr; + + RuntimeState* _state; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp new file mode 100644 index 00000000000..1c123c92b3a --- /dev/null +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { + std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; + + auto& hive_table_sink = _t_sink.hive_table_sink; + + if (_partition_columns_input_index.empty()) { + auto writer_iter = _partitions_to_writers.find(""); + if (writer_iter == _partitions_to_writers.end()) { + try { + std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + _partitions_to_writers.erase(writer_iter); + try { + writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + } else { + writer = writer_iter->second; + } + RETURN_IF_ERROR(writer->write(block)); + return Status::OK(); + } + } + + for (int i = 0; i < block.rows(); ++i) { + std::vector<std::string> partition_values; + try { + partition_values = _create_partition_values(block, i); + } catch (doris::Exception& e) { + return e.to_status(); + } + std::string partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + + auto create_and_open_writer = + [&](const std::string& partition_name, int position, + std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + try { + auto writer = _create_partition_writer(block, position); + RETURN_IF_ERROR(writer->open(_state, _profile)); + IColumn::Filter filter(block.rows(), 0); + filter[position] = 1; + writer_positions.insert({writer, std::move(filter)}); + _partitions_to_writers.insert({partition_name, writer}); + writer_ptr = writer; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_iter = _partitions_to_writers.find(partition_name); + if (writer_iter == _partitions_to_writers.end()) { + std::shared_ptr<VHivePartitionWriter> writer; + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + writer_positions.erase(writer_iter->second); + _partitions_to_writers.erase(writer_iter); + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + writer = writer_iter->second; + } + auto writer_pos_iter = writer_positions.find(writer); + if (writer_pos_iter == writer_positions.end()) { + IColumn::Filter filter(block.rows(), 0); + filter[i] = 1; + writer_positions.insert({writer, std::move(filter)}); + } else { + writer_pos_iter->second[i] = 1; + } + } + } + + for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { + RETURN_IF_ERROR(it->first->write(block, &it->second)); + } + return Status::OK(); +} + +Status VHiveTableWriter::close(Status status) { + for (const auto& pair : _partitions_to_writers) { + Status st = pair.second->close(status); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); + continue; + } + } + _partitions_to_writers.clear(); + return Status::OK(); +} + +std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( + vectorized::Block& block, int position) { + auto& hive_table_sink = _t_sink.hive_table_sink; + std::vector<std::string> partition_values; + std::string partition_name; + if (!_partition_columns_input_index.empty()) { + partition_values = _create_partition_values(block, position); + partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + } + const std::vector<THivePartition>& partitions = hive_table_sink.partitions; + const THiveLocationParams& write_location = hive_table_sink.location; + const THivePartition* existing_partition = nullptr; + bool existing_table = true; + for (const auto& partition : partitions) { + if (partition_values == partition.values) { + existing_partition = &partition; + break; + } + } + TUpdateMode::type update_mode; + VHivePartitionWriter::WriteInfo write_info; + TFileFormatType::type file_format_type; + TFileCompressType::type write_compress_type; + if (existing_partition == nullptr) { // new partition + if (existing_table == false) { // new table + update_mode = TUpdateMode::NEW; + if (_partition_columns_input_index.empty()) { // new unpartitioned table + write_info = {write_location.write_path, write_location.target_path, + write_location.file_type}; + } else { // a new partition in a new partitioned table + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), + write_location.file_type}; + } + } else { // a new partition in an existing partitioned table, or an existing unpartitioned table + if (_partition_columns_input_index.empty()) { // an existing unpartitioned table + update_mode = + !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE; + write_info = {write_location.write_path, write_location.target_path, + write_location.file_type}; + } else { // a new partition in an existing partitioned table + update_mode = TUpdateMode::NEW; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), + write_location.file_type}; + } + // need to get schema from existing table ? + } + file_format_type = hive_table_sink.file_format; + write_compress_type = hive_table_sink.compression_type; + } else { // existing partition + if (!hive_table_sink.overwrite) { + update_mode = TUpdateMode::APPEND; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}", existing_partition->location.target_path); + write_info = {std::move(write_path), std::move(target_path), + existing_partition->location.file_type}; + file_format_type = existing_partition->file_format; + write_compress_type = hive_table_sink.compression_type; + } else { + update_mode = TUpdateMode::OVERWRITE; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), write_location.file_type}; + file_format_type = hive_table_sink.file_format; + write_compress_type = hive_table_sink.compression_type; + // need to get schema from existing table ? + } + } + + return std::make_shared<VHivePartitionWriter>( + _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, + hive_table_sink.columns, std::move(write_info), + fmt::format("{}{}", _compute_file_name(), + _get_file_extension(file_format_type, write_compress_type)), + file_format_type, write_compress_type, hive_table_sink.hadoop_config); +} + +std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block, + int position) { + std::vector<std::string> partition_values; + for (int i = 0; i < _partition_columns_input_index.size(); ++i) { + int partition_column_idx = _partition_columns_input_index[i]; + vectorized::ColumnWithTypeAndName partition_column = + block.get_by_position(partition_column_idx); + std::string value = + _to_partition_value(_vec_output_expr_ctxs[partition_column_idx]->root()->type(), + partition_column, position); + + // Check if value contains only printable ASCII characters + bool isValid = true; + for (char c : value) { + if (c < 0x20 || c > 0x7E) { + isValid = false; + break; + } + } + + if (!isValid) { + // Encode value using Base16 encoding with space separator + std::stringstream encoded; + for (unsigned char c : value) { + encoded << std::hex << std::setw(2) << std::setfill('0') << (int)c; + encoded << " "; + } + throw doris::Exception( + doris::ErrorCode::INTERNAL_ERROR, + "Hive partition values can only contain printable ASCII characters (0x20 - " + "0x7E). Invalid value: {}", + encoded.str()); + } + + partition_values.emplace_back(value); + } + + return partition_values; +} + +std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_desc, + const ColumnWithTypeAndName& partition_column, + int position) { + ColumnPtr column; + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) { + auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + if (null_map_data[position]) { + return "__HIVE_DEFAULT_PARTITION__"; + } + column = nullable_column->get_nested_column_ptr(); + } else { + column = partition_column.column; + } + auto [item, size] = column->get_data_at(position); + switch (type_desc.type) { + case TYPE_BOOLEAN: { + vectorized::Field field = + vectorized::check_and_get_column<const ColumnUInt8>(*column)->operator[](position); + return std::to_string(field.get<bool>()); + } + case TYPE_TINYINT: { + return std::to_string(*reinterpret_cast<const Int8*>(item)); + } + case TYPE_SMALLINT: { + return std::to_string(*reinterpret_cast<const Int16*>(item)); + } + case TYPE_INT: { + return std::to_string(*reinterpret_cast<const Int32*>(item)); + } + case TYPE_BIGINT: { + return std::to_string(*reinterpret_cast<const Int64*>(item)); + } + case TYPE_FLOAT: { + return std::to_string(*reinterpret_cast<const Float32*>(item)); + } + case TYPE_DOUBLE: { + return std::to_string(*reinterpret_cast<const Float64*>(item)); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + return std::string(item, size); + } + case TYPE_DATE: { + VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + case TYPE_DATETIME: { + VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + break; + } + case TYPE_DATEV2: { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item); + + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + case TYPE_DATETIMEV2: { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item); + + char buf[64]; + char* pos = value.to_string(buf, type_desc.scale); + return std::string(buf, pos - buf - 1); + } + case TYPE_DECIMALV2: { + Decimal128V2 value = *(Decimal128V2*)(item); + return value.to_string(type_desc.scale); + } + case TYPE_DECIMAL32: { + Decimal32 value = *(Decimal32*)(item); + return value.to_string(type_desc.scale); + } + case TYPE_DECIMAL64: { + Decimal64 value = *(Decimal64*)(item); + return value.to_string(type_desc.scale); + } + case TYPE_DECIMAL128I: { + Decimal128V3 value = *(Decimal128V3*)(item); + return value.to_string(type_desc.scale); + } + case TYPE_DECIMAL256: { + Decimal256 value = *(Decimal256*)(item); + return value.to_string(type_desc.scale); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type for partition {}", type_desc.debug_string()); + } + } +} + +std::string VHiveTableWriter::_get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type write_compress_type) { + std::string compress_name; + switch (write_compress_type) { + case TFileCompressType::SNAPPYBLOCK: { + compress_name = ".snappy"; + break; + } + case TFileCompressType::ZLIB: { + compress_name = ".zlib"; + break; + } + case TFileCompressType::ZSTD: { + compress_name = ".zstd"; + break; + } + default: { + compress_name = ""; + break; + } + } + + std::string file_format_name; + switch (file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + file_format_name = ".parquet"; + break; + } + case TFileFormatType::FORMAT_ORC: { + file_format_name = ".orc"; + break; + } + default: { + file_format_name = ""; + break; + } + } + return fmt::format("{}{}", compress_name, file_format_name); +} + +std::string VHiveTableWriter::_compute_file_name() { + boost::uuids::uuid uuid = boost::uuids::random_generator()(); + + std::string uuid_str = boost::uuids::to_string(uuid); + + return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str); +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h new file mode 100644 index 00000000000..a4681b32e3f --- /dev/null +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> + +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/writer/async_result_writer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; +struct TypeDescriptor; + +namespace vectorized { + +class Block; +class VHivePartitionWriter; +struct ColumnWithTypeAndName; + +class VHiveTableWriter final : public AsyncResultWriter { +public: + VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + + ~VHiveTableWriter() = default; + + Status init_properties(ObjectPool* pool); + + Status open(RuntimeState* state, RuntimeProfile* profile) override; + + Status write(vectorized::Block& block) override; + + Status close(Status) override; + +private: + std::shared_ptr<VHivePartitionWriter> _create_partition_writer(vectorized::Block& block, + int position); + + std::vector<std::string> _create_partition_values(vectorized::Block& block, int position); + + std::string _to_partition_value(const TypeDescriptor& type_desc, + const ColumnWithTypeAndName& partition_column, int position); + + std::string _get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type write_compress_type); + + std::string _compute_file_name(); + + // Currently it is a copy, maybe it is better to use move semantics to eliminate it. + TDataSink _t_sink; + RuntimeState* _state = nullptr; + RuntimeProfile* _profile = nullptr; + std::vector<int> _partition_columns_input_index; + std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> _partitions_to_writers; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/vhive_utils.cpp b/be/src/vec/sink/writer/vhive_utils.cpp new file mode 100644 index 00000000000..9a97b893775 --- /dev/null +++ b/be/src/vec/sink/writer/vhive_utils.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_utils.h" + +#include <algorithm> +#include <regex> +#include <sstream> + +namespace doris { +namespace vectorized { + +const std::regex VHiveUtils::PATH_CHAR_TO_ESCAPE("[\\x00-\\x1F\"#%'*/:=?\\\\\\x7F\\{\\[\\]\\^]"); + +std::string VHiveUtils::make_partition_name(const std::vector<THiveColumn>& columns, + const std::vector<int>& partition_columns_input_index, + const std::vector<std::string>& values) { + std::stringstream partition_name_stream; + + for (size_t i = 0; i < partition_columns_input_index.size(); i++) { + if (i > 0) { + partition_name_stream << '/'; + } + std::string column = columns[partition_columns_input_index[i]].name; + std::string value = values[i]; + std::transform(column.begin(), column.end(), column.begin(), + [&](char c) { return std::tolower(c); }); + partition_name_stream << escape_path_name(column) << '=' << escape_path_name(value); + } + + return partition_name_stream.str(); +} + +std::string VHiveUtils::escape_path_name(const std::string& path) { + if (path.empty()) { + return "__HIVE_DEFAULT_PARTITION__"; + } + + std::smatch match; + if (!std::regex_search(path, match, PATH_CHAR_TO_ESCAPE)) { + return path; + } + + std::stringstream ss; + size_t from_index = 0; + auto begin = path.begin(); + auto end = path.end(); + while (std::regex_search(begin + from_index, end, match, PATH_CHAR_TO_ESCAPE)) { + size_t escape_at_index = match.position() + from_index; + if (escape_at_index > from_index) { + ss << path.substr(from_index, escape_at_index - from_index); + } + char c = path[escape_at_index]; + ss << '%' << std::hex << std::uppercase << static_cast<int>(c >> 4) + << static_cast<int>(c & 0xF); + from_index = escape_at_index + 1; + } + if (from_index < path.length()) { + ss << path.substr(from_index); + } + return ss.str(); +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vhive_utils.h b/be/src/vec/sink/writer/vhive_utils.h new file mode 100644 index 00000000000..49cd45dc3a8 --- /dev/null +++ b/be/src/vec/sink/writer/vhive_utils.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> + +#include <algorithm> +#include <iostream> +#include <regex> +#include <sstream> +#include <string> +#include <vector> + +namespace doris { +namespace vectorized { + +class VHiveUtils { +public: + VHiveUtils() = delete; + + static const std::regex PATH_CHAR_TO_ESCAPE; + + static std::string make_partition_name(const std::vector<THiveColumn>& columns, + const std::vector<int>& partition_columns_input_index, + const std::vector<std::string>& values); + + static std::string escape_path_name(const std::string& path); +}; +} // namespace vectorized +} // namespace doris diff --git a/be/test/util/indexed_priority_queue_test.cpp b/be/test/util/indexed_priority_queue_test.cpp new file mode 100644 index 00000000000..54686fc0194 --- /dev/null +++ b/be/test/util/indexed_priority_queue_test.cpp @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/indexed_priority_queue.hpp" + +#include <gtest/gtest.h> + +namespace doris { + +class IndexedPriorityQueueTest : public testing::Test { +public: + IndexedPriorityQueueTest() = default; + virtual ~IndexedPriorityQueueTest() = default; +}; + +TEST_F(IndexedPriorityQueueTest, test_high_to_low) { + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> pq; + + pq.add_or_update(3, 10); + pq.add_or_update(1, 5); + pq.add_or_update(4, 15); + pq.add_or_update(2, 8); + pq.add_or_update(5, 5); + pq.add_or_update(6, 5); + + std::vector<int> expected_elements = {4, 3, 2, 1, 5, 6}; + std::vector<int> actual_elements; + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); + + int removed = 2; + pq.remove(removed); + + expected_elements = {4, 3, 1, 5, 6}; + actual_elements.clear(); + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); + + pq.add_or_update(4, 1); + + expected_elements = {3, 1, 5, 6, 4}; + actual_elements.clear(); + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); +} + +TEST_F(IndexedPriorityQueueTest, test_low_to_high) { + IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> pq; + + pq.add_or_update(3, 10); + pq.add_or_update(1, 5); + pq.add_or_update(4, 15); + pq.add_or_update(2, 8); + pq.add_or_update(5, 5); + pq.add_or_update(6, 5); + + std::vector<int> expected_elements = {1, 5, 6, 2, 3, 4}; + std::vector<int> actual_elements; + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); + + int removed = 2; + pq.remove(removed); + + expected_elements = {1, 5, 6, 3, 4}; + actual_elements.clear(); + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); + + pq.add_or_update(4, 1); + + expected_elements = {4, 1, 5, 6, 3}; + actual_elements.clear(); + for (auto& elem : pq) { + actual_elements.push_back(elem); + } + EXPECT_EQ(expected_elements, actual_elements); +} + +} // namespace doris diff --git a/be/test/vec/exec/skewed_partition_rebalancer_test.cpp b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp new file mode 100644 index 00000000000..f5ce4d2bb8e --- /dev/null +++ b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <gtest/gtest.h> + +#include <list> + +namespace doris::vectorized { + +class SkewedPartitionRebalancerTest : public testing::Test { +public: + SkewedPartitionRebalancerTest() = default; + virtual ~SkewedPartitionRebalancerTest() = default; + +private: + std::vector<std::list<int>> _get_partition_positions( + std::unique_ptr<SkewedPartitionRebalancer>& rebalancer, + std::vector<long>& partition_row_count, int partition_count, int max_position) { + std::vector<std::list<int>> partitionPositions(rebalancer->get_task_count()); + + for (int partition = 0; partition < rebalancer->get_task_count(); partition++) { + partitionPositions[partition] = std::list<int>(); + } + + for (int position = 0; position < max_position; position++) { + int partition = position % partition_count; + partition = rebalancer->get_task_id(partition, partition_row_count[partition]++); + partitionPositions[partition].push_back(position); + } + + return partitionPositions; + } + + static bool _vectors_equal(const std::vector<std::list<int>>& vec1, + const std::vector<std::list<int>>& vec2) { + if (vec1.size() != vec2.size()) { + return false; + } + for (size_t i = 0; i < vec1.size(); i++) { + if (vec1[i] != vec2[i]) { + return false; + } + } + return true; + } + + static bool _compare_vector_of_lists(const std::vector<std::list<int>>& expected, + const std::vector<std::list<int>>& actual) { + if (expected.size() != actual.size()) { + return false; + } + + for (size_t i = 0; i < expected.size(); ++i) { + if (expected[i] != actual[i]) { + return false; + } + } + + return true; + } +}; + +TEST_F(SkewedPartitionRebalancerTest, test_rebalance_with_skewness) { + const int partitionCount = 3; + const int taskCount = 3; + const int taskBucketCount = 3; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * MEGABYTE; // 1MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 1000); + rebalancer->add_partition_row_count(2, 1000); + rebalancer->add_data_processed(40 * MEGABYTE); + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 3, 6, 9, 12, 15}, {1, 4, 7, 10, 13, 16}, {2, 5, 8, 11, 14}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, rebalancer->get_partition_assignments())); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 1000); + rebalancer->add_partition_row_count(2, 1000); + rebalancer->add_data_processed(20 * MEGABYTE); + + // Rebalancing will happen since we crossed the data processed limit. + // Part0 -> Task1 (Bucket1), Part1 -> Task0 (Bucket1), Part2 -> Task0 (Bucket2) + rebalancer->rebalance(); + + ASSERT_TRUE(_vectors_equal( + {{0, 2, 4, 6, 8, 10, 12, 14, 16}, {1, 3, 7, 9, 13, 15}, {5, 11}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE(_compare_vector_of_lists({{0, 1}, {1, 0}, {2, 0}}, + rebalancer->get_partition_assignments())); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 1000); + rebalancer->add_partition_row_count(2, 1000); + rebalancer->add_data_processed(200 * MEGABYTE); + + // Rebalancing will happen + // Part0 -> Task2 (Bucket1), Part1 -> Task2 (Bucket2), Part2 -> Task1 (Bucket2) + rebalancer->rebalance(); + + ASSERT_TRUE(_vectors_equal( + {{0, 2, 4, 9, 11, 13}, {1, 3, 5, 10, 12, 14}, {6, 7, 8, 15, 16}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1, 0, 2}, {2, 0, 1}}, + rebalancer->get_partition_assignments())); +} + +TEST_F(SkewedPartitionRebalancerTest, test_rebalance_without_skewness) { + const int partitionCount = 6; + const int taskCount = 3; + const int taskBucketCount = 2; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * MEGABYTE; // 1MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 700); + rebalancer->add_partition_row_count(2, 600); + rebalancer->add_partition_row_count(3, 1000); + rebalancer->add_partition_row_count(4, 700); + rebalancer->add_partition_row_count(5, 600); + + rebalancer->add_data_processed(500 * MEGABYTE); + // No rebalancing will happen since there is no skewness across task buckets + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 3}, {1, 4}, {2, 5}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 6))); + EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}, {0}, {1}, {2}}, + rebalancer->get_partition_assignments())); +} + +TEST_F(SkewedPartitionRebalancerTest, + test_no_rebalance_when_data_written_is_less_than_the_rebalance_limit) { + const int partitionCount = 3; + const int taskCount = 3; + const int taskBucketCount = 3; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * MEGABYTE; // 1MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 0); + rebalancer->add_partition_row_count(2, 0); + + rebalancer->add_data_processed(40 * MEGABYTE); + // No rebalancing will happen since we do not cross the max data processed limit of 50MB + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 3}, {1, 4}, {2, 5}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 6))); + EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, rebalancer->get_partition_assignments())); +} + +TEST_F(SkewedPartitionRebalancerTest, + test_no_rebalance_when_data_written_by_the_partition_is_less_than_writer_sacling_min_data_processed) { + const int partitionCount = 3; + const int taskCount = 3; + const int taskBucketCount = 3; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 600); + rebalancer->add_partition_row_count(2, 0); + + rebalancer->add_data_processed(60 * MEGABYTE); + // No rebalancing will happen since no partition has crossed the writerScalingMinDataProcessed limit of 50MB + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 3}, {1, 4}, {2, 5}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 6))); + EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, rebalancer->get_partition_assignments())); +} + +TEST_F(SkewedPartitionRebalancerTest, + test_rebalance_partition_to_single_task_in_a_rebalancing_loop) { + const int partitionCount = 3; + const int taskCount = 3; + const int taskBucketCount = 3; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * MEGABYTE; // 1MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 0); + rebalancer->add_partition_row_count(2, 0); + + rebalancer->add_data_processed(60 * MEGABYTE); + // rebalancing will only happen to a single task even though two tasks are available + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 6, 12}, {1, 3, 4, 7, 9, 10, 13, 15, 16}, {2, 5, 8, 11, 14}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE( + _compare_vector_of_lists({{0, 1}, {1}, {2}}, rebalancer->get_partition_assignments())); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 0); + rebalancer->add_partition_row_count(2, 0); + rebalancer->add_data_processed(60 * MEGABYTE); + rebalancer->rebalance(); + + ASSERT_TRUE(_vectors_equal( + {{0, 9}, {1, 3, 4, 7, 10, 12, 13, 16}, {2, 5, 6, 8, 11, 14, 15}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1}, {2}}, + rebalancer->get_partition_assignments())); +} + +TEST_F(SkewedPartitionRebalancerTest, test_consider_skewed_partition_only_within_a_cycle) { + const int partitionCount = 3; + const int taskCount = 3; + const int taskBucketCount = 1; + const long MEGABYTE = 1024 * 1024; + const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * MEGABYTE; // 1MB + const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE; // 50MB + + std::unique_ptr<SkewedPartitionRebalancer> rebalancer( + new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, + MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD, + MIN_DATA_PROCESSED_REBALANCE_THRESHOLD)); + + rebalancer->add_partition_row_count(0, 1000); + rebalancer->add_partition_row_count(1, 800); + rebalancer->add_partition_row_count(2, 0); + + rebalancer->add_data_processed(60 * MEGABYTE); + // rebalancing will happen for partition 0 to task 2 since partition 0 is skewed. + rebalancer->rebalance(); + + std::vector<long> partitionRowCount(partitionCount, 0); + + ASSERT_TRUE(_vectors_equal( + {{0, 6, 12}, {1, 4, 7, 10, 13, 16}, {2, 3, 5, 8, 9, 11, 14, 15}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE( + _compare_vector_of_lists({{0, 2}, {1}, {2}}, rebalancer->get_partition_assignments())); + + rebalancer->add_partition_row_count(0, 0); + rebalancer->add_partition_row_count(1, 800); + rebalancer->add_partition_row_count(2, 1000); + // rebalancing will happen for partition 2 to task 0 since partition 2 is skewed. Even though partition 1 has + // written more amount of data from start, it will not be considered since it is not the most skewed in + // this rebalancing cycle. + rebalancer->add_data_processed(60 * MEGABYTE); + rebalancer->rebalance(); + + ASSERT_TRUE(_vectors_equal( + {{0, 2, 6, 8, 12, 14}, {1, 4, 7, 10, 13, 16}, {3, 5, 9, 11, 15}}, + _get_partition_positions(rebalancer, partitionRowCount, partitionCount, 17))); + EXPECT_TRUE(_compare_vector_of_lists({{0, 2}, {1}, {2, 0}}, + rebalancer->get_partition_assignments())); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/exec/vhive_utils_test.cpp b/be/test/vec/exec/vhive_utils_test.cpp new file mode 100644 index 00000000000..d14a004e50b --- /dev/null +++ b/be/test/vec/exec/vhive_utils_test.cpp @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vhive_utils.h" + +#include <gtest/gtest.h> + +namespace doris::vectorized { + +class VHiveUtilsTest : public testing::Test { +public: + VHiveUtilsTest() = default; + virtual ~VHiveUtilsTest() = default; +}; + +TEST_F(VHiveUtilsTest, test_make_partition_name) { + { + std::vector<THiveColumn> columns; + THiveColumn column1; + column1.name = "abc"; + columns.emplace_back(std::move(column1)); + std::vector<int> partition_columns_input_index = {0}; + EXPECT_EQ("abc=xyz", + VHiveUtils::make_partition_name(columns, partition_columns_input_index, {"xyz"})); + } + + { + std::vector<THiveColumn> columns; + THiveColumn column1; + column1.name = "abc:qqq"; + columns.emplace_back(std::move(column1)); + std::vector<int> partition_columns_input_index = {0}; + EXPECT_EQ("abc%3Aqqq=xyz%2Fyyy%3Dzzz", + VHiveUtils::make_partition_name(columns, partition_columns_input_index, + {"xyz/yyy=zzz"})); + } + + { + std::vector<THiveColumn> columns; + THiveColumn column1; + column1.name = "abc"; + columns.emplace_back(std::move(column1)); + THiveColumn column2; + column2.name = "def"; + columns.emplace_back(std::move(column2)); + THiveColumn column3; + column3.name = "xyz"; + columns.emplace_back(std::move(column3)); + std::vector<int> partition_columns_input_index = {0, 1, 2}; + EXPECT_EQ("abc=qqq/def=rrr/xyz=sss", + VHiveUtils::make_partition_name(columns, partition_columns_input_index, + {"qqq", "rrr", "sss"})); + } +} + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org