This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e2d7eca8875 [fix](auto-partition) fix auto partition load lost data in multi sender (#35287) e2d7eca8875 is described below commit e2d7eca88754a4c06f0c03bf5d4da68cdce0369d Author: Kaijie Chen <c...@apache.org> AuthorDate: Wed May 29 14:24:11 2024 +0800 [fix](auto-partition) fix auto partition load lost data in multi sender (#35287) Change `use_cnt` mechanism for incremental (auto partition) channels and streams, it's now dynamically counted. Use `close_wait()` of regular partitions as a synchronize point to make sure all sinks are in close phase before closing any incremental (auto partition) channels and streams. Add dummy (fake) partition and tablet if there is no regular partition in the auto partition table. Replace #34740 Co-authored-by: zhaochangle <zhaochan...@selectdb.com> --- be/src/cloud/cloud_tablets_channel.cpp | 6 +- be/src/exec/tablet_info.cpp | 17 +-- be/src/runtime/load_channel.cpp | 25 ++++- be/src/runtime/load_channel.h | 9 +- be/src/runtime/load_channel_mgr.cpp | 8 -- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/load_stream.h | 4 + be/src/runtime/tablets_channel.cpp | 43 ++++++- be/src/runtime/tablets_channel.h | 9 +- be/src/vec/sink/load_stream_map_pool.cpp | 11 +- be/src/vec/sink/load_stream_map_pool.h | 4 +- be/src/vec/sink/load_stream_stub.cpp | 13 ++- be/src/vec/sink/load_stream_stub.h | 10 +- be/src/vec/sink/writer/vtablet_writer.cpp | 123 +++++++++++++++------ be/src/vec/sink/writer/vtablet_writer.h | 67 +++++++---- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 60 ++++++---- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 + .../apache/doris/catalog/ListPartitionItem.java | 2 +- .../org/apache/doris/catalog/PartitionKey.java | 7 ++ .../apache/doris/catalog/RangePartitionItem.java | 6 +- .../apache/doris/datasource/InternalCatalog.java | 4 +- .../org/apache/doris/planner/OlapTableSink.java | 111 ++++++++++++++++++- .../apache/doris/service/FrontendServiceImpl.java | 14 +-- gensrc/proto/internal_service.proto | 2 + gensrc/thrift/Descriptors.thrift | 1 + .../sql/two_instance_correctness.out | 4 + .../test_auto_range_partition.groovy | 3 +- .../auto_partition/sql/multi_thread_load.groovy | 2 +- .../sql/two_instance_correctness.groovy | 45 ++++++++ 29 files changed, 467 insertions(+), 147 deletions(-) diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index 046916aa9a0..e063ab68116 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -103,8 +103,6 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo return _close_status; } - LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id - << ", backend id: " << req.backend_id(); for (auto pid : req.partition_ids()) { _partition_ids.emplace(pid); } @@ -113,6 +111,10 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo _num_remaining_senders--; *finished = (_num_remaining_senders == 0); + LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id + << ", backend id: " << req.backend_id() + << " remaining sender: " << _num_remaining_senders; + if (!*finished) { return Status::OK(); } diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 62ff0b2fcce..e32e9c9efcf 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() { // for both auto/non-auto partition table. _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED; - // initial partitions + // initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding for (const auto& t_part : _t_param.partitions) { VOlapTablePartition* part = nullptr; RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); - if (_is_in_partition) { - for (auto& in_key : part->in_keys) { - _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + + if (!_t_param.partitions_is_fake) { + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + } + } else { + _partitions_map->emplace( + std::tuple {part->end_key.first, part->end_key.second, false}, part); } - } else { - _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, - part); } } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index b6f8a6eeff3..ab7d6559e2c 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -20,9 +20,9 @@ #include <gen_cpp/internal_service.pb.h> #include <glog/logging.h> -#include "bvar/bvar.h" #include "cloud/cloud_tablets_channel.h" #include "cloud/config.h" +#include "common/logging.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -35,11 +35,11 @@ namespace doris { bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt"); LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - const std::string& sender_ip, int64_t backend_id, bool enable_profile) + std::string sender_ip, int64_t backend_id, bool enable_profile) : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(sender_ip), + _sender_ip(std::move(sender_ip)), _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr<QueryContext> query_context = nullptr; @@ -176,6 +176,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, } // 3. handle eos + // if channel is incremental, maybe hang on close until all close request arrived. if (request.has_eos() && request.eos()) { st = _handle_eos(channel.get(), request, response); _report_profile(response); @@ -197,6 +198,23 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, auto index_id = request.index_id(); RETURN_IF_ERROR(channel->close(this, request, response, &finished)); + + // for init node, we close waiting(hang on) all close request and let them return together. + if (request.has_hang_wait() && request.hang_wait()) { + DCHECK(!channel->is_incremental_channel()); + VLOG_TRACE << "reciever close waiting!" << request.sender_id(); + int count = 0; + while (!channel->is_finished()) { + bthread_usleep(1000); + count++; + } + // now maybe finished or cancelled. + VLOG_TRACE << "reciever close wait finished!" << request.sender_id(); + if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec + return Status::InternalError("Tablets channel didn't wait all close"); + } + } + if (finished) { std::lock_guard<std::mutex> l(_lock); { @@ -206,6 +224,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, std::make_pair(channel->total_received_rows(), channel->num_rows_filtered()))); _tablets_channels.erase(index_id); } + VLOG_NOTICE << "load " << _load_id.to_string() << " closed tablets_channel " << index_id; _finished_channel_ids.emplace(index_id); } return Status::OK(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 4a437e51907..98a8d7c9f81 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,10 +17,7 @@ #pragma once -#include <algorithm> #include <atomic> -#include <functional> -#include <map> #include <memory> #include <mutex> #include <ostream> @@ -28,15 +25,11 @@ #include <unordered_map> #include <unordered_set> #include <utility> -#include <vector> #include "common/status.h" -#include "olap/memtable_memory_limiter.h" -#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/runtime_profile.h" #include "util/spinlock.h" -#include "util/thrift_util.h" #include "util/uid_util.h" namespace doris { @@ -52,7 +45,7 @@ class BaseTabletsChannel; class LoadChannel { public: LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - const std::string& sender_ip, int64_t backend_id, bool enable_profile); + std::string sender_ip, int64_t backend_id, bool enable_profile); ~LoadChannel(); // open a new load channel if not exist diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 53063d90673..d31ce1d9a7e 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -24,25 +24,17 @@ // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <ctime> -#include <functional> -#include <map> #include <memory> #include <ostream> -#include <queue> #include <string> -#include <tuple> #include <vector> #include "common/config.h" #include "common/logging.h" #include "runtime/exec_env.h" #include "runtime/load_channel.h" -#include "runtime/memory/mem_tracker.h" #include "util/doris_metrics.h" -#include "util/mem_info.h" #include "util/metrics.h" -#include "util/perf_counters.h" -#include "util/pretty_printer.h" #include "util/thread.h" namespace doris { diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 30122e4a587..aa095432072 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -362,7 +362,7 @@ LoadStream::~LoadStream() { Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); _total_streams = request->total_streams(); - DCHECK(_total_streams > 0) << "total streams should be greator than 0"; + _is_incremental = (_total_streams == 0); _schema = std::make_shared<OlapTableSchemaParam>(); RETURN_IF_ERROR(_schema->init(request->schema())); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index c61a2d163de..b2635698379 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -117,6 +117,9 @@ public: void add_source(int64_t src_id) { std::lock_guard lock_guard(_lock); _open_streams[src_id]++; + if (_is_incremental) { + _total_streams++; + } } Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, @@ -167,6 +170,7 @@ private: RuntimeProfile::Counter* _close_wait_timer = nullptr; LoadStreamMgr* _load_stream_mgr = nullptr; QueryThreadContext _query_thread_context; + bool _is_incremental = false; }; using LoadStreamPtr = std::unique_ptr<LoadStream>; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index b3d2e1a6329..a1a8ec1a996 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -142,9 +142,29 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); - _num_remaining_senders = request.num_senders(); - _next_seqs.resize(_num_remaining_senders, 0); - _closed_senders.Reset(_num_remaining_senders); + int max_sender = request.num_senders(); + /* + * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none. + * there are two possibilities: + * 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be + * called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never + * be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc, + * the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and + * return together to avoid close-then-incremental-open problem. + * 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition + * (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data + * distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true. + * then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic + * and also need same number of senders' close to close. but will not hang. + */ + if (_open_by_incremental) { + DCHECK(_num_remaining_senders == 0) << _num_remaining_senders; + } else { + _num_remaining_senders = max_sender; + } + // just use max_sender no matter incremental or not cuz we dont know how many senders will open. + _next_seqs.resize(max_sender, 0); + _closed_senders.Reset(max_sender); RETURN_IF_ERROR(_open_all_writers(request)); @@ -154,10 +174,19 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { SCOPED_TIMER(_incremental_open_timer); - if (_state == kInitialized) { // haven't opened + + // current node first opened by incremental open + if (_state == kInitialized) { + _open_by_incremental = true; RETURN_IF_ERROR(open(params)); } + std::lock_guard<std::mutex> l(_lock); + + if (_open_by_incremental) { + _num_remaining_senders++; + } + std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; for (const auto& index : _schema->indexes()) { @@ -231,8 +260,7 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq *finished = (_num_remaining_senders == 0); return _close_status; } - LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id - << ", backend id: " << backend_id; + for (auto pid : partition_ids) { _partition_ids.emplace(pid); } @@ -240,6 +268,9 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq _num_remaining_senders--; *finished = (_num_remaining_senders == 0); + LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id + << ", backend id: " << backend_id << " remaining sender: " << _num_remaining_senders; + if (!*finished) { return Status::OK(); } diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 88061e30533..978bbb155dd 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -21,8 +21,6 @@ #include <atomic> #include <cstdint> -#include <functional> -#include <map> #include <mutex> #include <ostream> #include <shared_mutex> @@ -116,6 +114,11 @@ public: size_t num_rows_filtered() const { return _num_rows_filtered; } + // means this tablets in this BE is incremental opened partitions. + bool is_incremental_channel() const { return _open_by_incremental; } + + bool is_finished() const { return _state == kFinished; } + protected: Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq, std::unordered_map<int64_t, std::vector<uint32_t>>& tablet_to_rowidxs, @@ -158,8 +161,8 @@ protected: int64_t _txn_id = -1; int64_t _index_id = -1; std::shared_ptr<OlapTableSchemaParam> _schema; - TupleDescriptor* _tuple_desc = nullptr; + bool _open_by_incremental = false; // next sequence we expect int _num_remaining_senders = 0; diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index fdcfe190dbf..7a3072ade6e 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, DCHECK(num_use > 0) << "use num should be greater than 0"; } -std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) { +std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { std::lock_guard<std::mutex> lock(_mutex); std::shared_ptr<Streams> streams = _streams_for_node[dst_id]; if (streams != nullptr) { @@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) { streams = std::make_shared<Streams>(); for (int i = 0; i < _num_streams; i++) { streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, - _enable_unique_mow_for_index)); + _enable_unique_mow_for_index, incremental)); } _streams_for_node[dst_id] = streams; return streams; @@ -101,10 +101,13 @@ bool LoadStreamMap::release() { return false; } -Status LoadStreamMap::close_load() { - return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { +Status LoadStreamMap::close_load(bool incremental) { + return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { const auto& tablets = _tablets_to_commit[dst_id]; for (auto& stream : streams) { + if (stream->is_incremental() != incremental) { + continue; + } RETURN_IF_ERROR(stream->close_load(tablets)); } return Status::OK(); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index aad12dba2aa..d0f72ab7e00 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -78,7 +78,7 @@ public: LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, LoadStreamMapPool* pool); - std::shared_ptr<Streams> get_or_create(int64_t dst_id); + std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false); std::shared_ptr<Streams> at(int64_t dst_id); @@ -95,7 +95,7 @@ public: // send CLOSE_LOAD to all streams, return ERROR if any. // only call this method after release() returns true. - Status close_load(); + Status close_load(bool incremental); private: const UniqueId _load_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 92670c1c930..caebb381db6 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental) : _load_id(load_id), _src_id(src_id), _tablet_schema_for_index(schema_map), - _enable_unique_mow_for_index(mow_map) {}; + _enable_unique_mow_for_index(mow_map), + _is_incremental(incremental) {}; LoadStreamStub::~LoadStreamStub() { if (_is_init.load() && !_is_closed.load()) { @@ -168,7 +169,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); - request.set_total_streams(total_streams); + if (_is_incremental) { + request.set_total_streams(0); + } else if (total_streams > 0) { + request.set_total_streams(total_streams); + } else { + return Status::InternalError("total_streams should be greator than 0"); + } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 1f0d2e459d3..1bf0fac4e38 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -111,12 +111,12 @@ public: // construct new stub LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map); + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false); LoadStreamStub(UniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) - : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {}; + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) + : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {}; // for mock this class in UT #ifdef BE_TEST @@ -195,6 +195,8 @@ public: int64_t dst_id() const { return _dst_id; } + bool is_incremental() const { return _is_incremental; } + friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); std::string to_string(); @@ -255,6 +257,8 @@ protected: bthread::Mutex _failed_tablets_mutex; std::vector<int64_t> _success_tablets; std::unordered_map<int64_t, Status> _failed_tablets; + + bool _is_incremental = false; }; } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 621a9ad1131..7ae091eda7f 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -35,11 +35,9 @@ #include <sys/param.h> #include <algorithm> -#include <exception> #include <initializer_list> #include <memory> #include <mutex> -#include <ranges> #include <sstream> #include <string> #include <unordered_map> @@ -47,26 +45,22 @@ #include <vector> #include "cloud/config.h" +#include "olap/inverted_index_parser.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/vdatetime_value.h" -#include "vec/sink/volap_table_sink.h" #include "vec/sink/vrow_distribution.h" #ifdef DEBUG #include <unordered_set> #endif -#include "bvar/bvar.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" #include "exec/tablet_info.h" -#include "runtime/client_cache.h" -#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -86,11 +80,8 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" -#include "vec/columns/column_decimal.h" -#include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" -#include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" @@ -110,7 +101,8 @@ bvar::Adder<int64_t> g_sink_write_rows; bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_rows_per_second("sink_throughput_row", &g_sink_write_rows, 60); -Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) { +Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, + bool incremental) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); for (const auto& tablet : tablets) { // First find the location BEs of this tablet @@ -128,8 +120,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart // NodeChannel is not added to the _parent->_pool. // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. // but the ObjectPool will hold a spin lock to delete objects. - channel = std::make_shared<VNodeChannel>(_parent, this, replica_node_id); + channel = + std::make_shared<VNodeChannel>(_parent, this, replica_node_id, incremental); _node_channels.emplace(replica_node_id, channel); + // incremental opened new node. when close we have use two-stage close. + if (incremental) { + _has_inc_node = true; + } + LOG(INFO) << "init new node for instance " << _parent->_sender_id + << ", incremantal:" << incremental; } else { channel = it->second; } @@ -359,22 +358,23 @@ Status VNodeChannel::init(RuntimeState* state) { // add block closure // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared(); - _send_block_callback->addFailedHandler([&, task_exec_ctx = _task_exec_ctx](bool is_last_rpc) { - auto ctx_lock = task_exec_ctx.lock(); - if (ctx_lock == nullptr) { - return; - } - _add_block_failed_callback(is_last_rpc); - }); + _send_block_callback->addFailedHandler( + [&, task_exec_ctx = _task_exec_ctx](const WriteBlockCallbackContext& ctx) { + std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); + if (ctx_lock == nullptr) { + return; + } + _add_block_failed_callback(ctx); + }); _send_block_callback->addSuccessHandler( [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { - auto ctx_lock = task_exec_ctx.lock(); + const WriteBlockCallbackContext& ctx) { + std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); if (ctx_lock == nullptr) { return; } - _add_block_success_callback(result, is_last_rpc); + _add_block_success_callback(result, ctx); }); _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); @@ -681,6 +681,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. + // end_mark makes is_last_rpc true when rpc finished and call callbacks. _send_block_callback->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; @@ -730,7 +731,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { + const WriteBlockCallbackContext& ctx) { std::lock_guard<std::mutex> l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call the following logic, @@ -748,7 +749,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(st.to_string()); - } else if (is_last_rpc) { + } else if (ctx._is_last_rpc) { for (const auto& tablet : result.tablet_vec()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet.tablet_id(); @@ -806,7 +807,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult } } -void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { +void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& ctx) { std::lock_guard<std::mutex> l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call `mark_as_failed`, @@ -823,7 +824,7 @@ void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); - } else if (is_last_rpc) { + } else if (ctx._is_last_rpc) { // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait // will be blocked. _add_batches_finished = true; @@ -896,12 +897,14 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } } - // waiting for finished, it may take a long time, so we couldn't set a timeout + // Waiting for finished until _add_batches_finished changed by rpc's finished callback. + // it may take a long time, so we couldn't set a timeout // For pipeline engine, the close is called in async writer's process block method, // so that it will not block pipeline thread. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { bthread_usleep(1000); } + VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; _close_time_ms = UnixMillis() - _close_time_ms; if (_cancelled || state->is_cancelled()) { @@ -929,17 +932,18 @@ void VNodeChannel::_close_check() { CHECK(_cur_mutable_block == nullptr) << name(); } -void VNodeChannel::mark_close() { +void VNodeChannel::mark_close(bool hang_wait) { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { return; } _cur_add_block_request->set_eos(true); + _cur_add_block_request->set_hang_wait(hang_wait); { std::lock_guard<std::mutex> l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { - // add a dummy block + // never had a block arrived. add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } auto tmp_add_block_request = @@ -1179,7 +1183,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { return Status::InternalError("unknown destination tuple descriptor"); } - if (_vec_output_expr_ctxs.size() > 0 && + if (!_vec_output_expr_ctxs.empty() && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " << "output_tuple_slot_num " << _output_tuple_desc->slots().size() @@ -1290,7 +1294,7 @@ Status VTabletWriter::_incremental_open_node_channel( // update and reinit for existing channels. std::shared_ptr<IndexChannel> channel = _index_id_to_channel[index->index_id]; DCHECK(channel != nullptr); - RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it + RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets into it } fmt::memory_buffer buf; @@ -1385,14 +1389,63 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status _try_close = true; // will stop periodic thread if (status.ok()) { + // BE id -> add_batch method counter + std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; + // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); - { - for (const auto& index_channel : _channels) { + for (const auto& index_channel : _channels) { + // two-step mark close. first we send close_origin to recievers to close all originly exist TabletsChannel. + // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel + // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all + // our channels. + if (index_channel->has_incremental_node_channel()) { + if (!status.ok()) { + break; + } + VLOG_TRACE << _sender_id << " first stage close start"; + index_channel->for_init_node_channel( + [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + ch->mark_close(true); + if (ch->is_cancelled()) { + status = cancel_channel_and_check_intolerable_failure( + status, ch->get_cancel_msg(), index_channel, ch); + } + }); if (!status.ok()) { break; } + index_channel->for_init_node_channel( + [this, &index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + auto s = ch->close_wait(_state); + if (!s.ok()) { + status = cancel_channel_and_check_intolerable_failure( + status, s.to_string(), index_channel, ch); + } + }); + if (!status.ok()) { + break; + } + index_channel->for_inc_node_channel( + [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + // only first try close, all node channels will mark_close() + ch->mark_close(); + if (ch->is_cancelled()) { + status = cancel_channel_and_check_intolerable_failure( + status, ch->get_cancel_msg(), index_channel, ch); + } + }); + } else { // not has_incremental_node_channel index_channel->for_each_node_channel( [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { if (!status.ok() || ch->is_closed()) { @@ -1405,8 +1458,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status status, ch->get_cancel_msg(), index_channel, ch); } }); - } // end for index channels - } + } + } // end for index channels } if (!status.ok()) { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 8075487abac..91aa9392e53 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -36,13 +36,11 @@ #include <cstddef> #include <cstdint> #include <functional> -#include <initializer_list> #include <map> #include <memory> #include <mutex> #include <ostream> #include <queue> -#include <set> #include <sstream> #include <string> #include <thread> @@ -55,23 +53,17 @@ #include "common/status.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" -#include "gutil/ref_counted.h" -#include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/thread_context.h" -#include "runtime/types.h" -#include "util/countdown_latch.h" #include "util/ref_count_closure.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" #include "vec/columns/column.h" -#include "vec/common/allocator.h" #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/vfile_format_transformer.h" #include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -114,6 +106,10 @@ struct AddBatchCounter { } }; +struct WriteBlockCallbackContext { + std::atomic<bool> _is_last_rpc {false}; +}; + // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. // Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. @@ -127,8 +123,13 @@ public: WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} ~WriteBlockCallback() override = default; - void addFailedHandler(const std::function<void(bool)>& fn) { failed_handler = fn; } - void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { success_handler = fn; } + void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) { + failed_handler = fn; + } + void addSuccessHandler( + const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) { + success_handler = fn; + } void join() override { // We rely on in_flight to assure one rpc is running, @@ -165,8 +166,8 @@ public: bool is_packet_in_flight() { return _packet_in_flight; } void end_mark() { - DCHECK(_is_last_rpc == false); - _is_last_rpc = true; + DCHECK(_ctx._is_last_rpc == false); + _ctx._is_last_rpc = true; } void call() override { @@ -175,9 +176,9 @@ public: LOG(WARNING) << "failed to send brpc batch, error=" << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode()) << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText(); - failed_handler(_is_last_rpc); + failed_handler(_ctx); } else { - success_handler(*(::doris::DummyBrpcCallback<T>::response_), _is_last_rpc); + success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx); } clear_in_flight(); } @@ -185,9 +186,9 @@ public: private: brpc::CallId cid; std::atomic<bool> _packet_in_flight {false}; - std::atomic<bool> _is_last_rpc {false}; - std::function<void(bool)> failed_handler; - std::function<void(const T&, bool)> success_handler; + WriteBlockCallbackContext _ctx; + std::function<void(const WriteBlockCallbackContext&)> failed_handler; + std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler; }; class IndexChannel; @@ -258,7 +259,8 @@ public: // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - void mark_close(); + // hang_wait = true will make reciever hang until all sender mark_closed. + void mark_close(bool hang_wait = false); bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } @@ -320,8 +322,9 @@ protected: void _close_check(); void _cancel_with_msg(const std::string& msg); - void _add_block_success_callback(const PTabletWriterAddBlockResult& result, bool is_last_rpc); - void _add_block_failed_callback(bool is_last_rpc); + void _add_block_success_callback(const PTabletWriterAddBlockResult& result, + const WriteBlockCallbackContext& ctx); + void _add_block_failed_callback(const WriteBlockCallbackContext& ctx); VTabletWriter* _parent = nullptr; IndexChannel* _index_channel = nullptr; @@ -425,7 +428,8 @@ public: ~IndexChannel() = default; // allow to init multi times, for incremental open more tablets for one index(table) - Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets); + Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, + bool incremental = false); void for_each_node_channel( const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { @@ -434,6 +438,26 @@ public: } } + void for_init_node_channel( + const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { + for (auto& it : _node_channels) { + if (!it.second->is_incremental()) { + func(it.second); + } + } + } + + void for_inc_node_channel( + const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { + for (auto& it : _node_channels) { + if (it.second->is_incremental()) { + func(it.second); + } + } + } + + bool has_incremental_node_channel() const { return _has_inc_node; } + void mark_as_failed(const VNodeChannel* node_channel, const std::string& err, int64_t tablet_id = -1); Status check_intolerable_failure(); @@ -492,6 +516,7 @@ private: std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels; // from tablet_id to backend channel std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet; + bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs mutable doris::SpinLock _fail_lock; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 5e8b57ee029..3c9c581b49d 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams( } } for (int64_t dst_id : new_backends) { - auto streams = _load_stream_map->get_or_create(dst_id); + auto streams = _load_stream_map->get_or_create(dst_id, true); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); @@ -311,6 +311,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); _tablets_for_node[node].emplace(tablet_id, tablet); + constexpr int64_t DUMMY_TABLET_ID = 0; + if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] { + // ignore fake tablet for auto partition + continue; + } if (known_indexes.contains(index.index_id)) [[likely]] { continue; } @@ -549,32 +554,26 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD on all streams if this is the last sink + // send CLOSE_LOAD on all non-incremental streams if this is the last sink if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load()); + RETURN_IF_ERROR(_load_stream_map->close_load(false)); } - // close_wait on all streams, even if this is not the last sink. + // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - { - SCOPED_TIMER(_close_load_timer); - RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, - const Streams& streams) -> Status { - for (auto& stream : streams) { - int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - return Status::OK(); - })); + RETURN_IF_ERROR(_close_wait(false)); + + // send CLOSE_LOAD on all incremental streams if this is the last sink. + // this must happen after all non-incremental streams are closed, + // so we can ensure all sinks are in close phase before closing incremental streams. + if (is_last_sink) { + RETURN_IF_ERROR(_load_stream_map->close_load(true)); } + // close_wait on all incremental streams, even if this is not the last sink. + RETURN_IF_ERROR(_close_wait(true)); + // calculate and submit commit info if (is_last_sink) { DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { @@ -625,6 +624,27 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } +Status VTabletWriterV2::_close_wait(bool incremental) { + SCOPED_TIMER(_close_load_timer); + return _load_stream_map->for_each_st( + [this, incremental](int64_t dst_id, const Streams& streams) -> Status { + for (auto& stream : streams) { + if (stream->is_incremental() != incremental) { + continue; + } + int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + return Status::OK(); + }); +} + void VTabletWriterV2::_calc_tablets_to_commit() { LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index e3d31fb32b9..5a9890cdb49 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -147,6 +147,8 @@ private: void _calc_tablets_to_commit(); + Status _close_wait(bool incremental); + Status _cancel(Status status); std::shared_ptr<MemTracker> _mem_tracker; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index 1a4d188a0ca..dafdcdc49f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -35,7 +35,7 @@ import java.util.Set; import java.util.stream.Collectors; public class ListPartitionItem extends PartitionItem { - public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); + public static final ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); private final List<PartitionKey> partitionKeys; private boolean isDefaultPartition = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index 9912a9daff5..bd6706d737a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -90,6 +90,13 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { return partitionKey; } + public static PartitionKey createMaxPartitionKey() { + PartitionKey partitionKey = new PartitionKey(); + partitionKey.keys.add(MaxLiteral.MAX_VALUE); + // type not set + return partitionKey; + } + public static PartitionKey createPartitionKey(List<PartitionValue> keys, List<Column> columns) throws AnalysisException { PartitionKey partitionKey = new PartitionKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 56214aaa0ea..bb7ddabbaa4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -30,10 +30,12 @@ import java.util.Optional; public class RangePartitionItem extends PartitionItem { private Range<PartitionKey> partitionKeyRange; - public static final Range<PartitionKey> DUMMY_ITEM; + public static final Range<PartitionKey> DUMMY_RANGE; + public static final RangePartitionItem DUMMY_ITEM; static { - DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey()); + DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey()); + DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), PartitionKey.createMaxPartitionKey())); } public RangePartitionItem(Range<PartitionKey> range) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index d1d0b7d4eff..d562fd62a1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1739,12 +1739,12 @@ public class InternalCatalog implements CatalogIf<Database> { isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty, + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_ITEM, ListPartitionItem.DUMMY_ITEM, dataProperty, + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 7cc316c350f..7e27dd8a606 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -349,18 +349,84 @@ public class OlapTableSink extends DataSink { return distColumns; } + private PartitionItem createDummyPartitionItem(PartitionType partType) throws UserException { + if (partType == PartitionType.LIST) { + return ListPartitionItem.DUMMY_ITEM; + } else if (partType == PartitionType.RANGE) { + return RangePartitionItem.DUMMY_ITEM; + } else { + throw new UserException("unsupported partition for OlapTable, partition=" + partType); + } + } + + private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer, + TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType) + throws UserException { + partitionParam.setEnableAutomaticPartition(true); + // these partitions only use in locations. not find partition. + partitionParam.setPartitionsIsFake(true); + + // set columns + for (Column partCol : partitionInfo.getPartitionColumns()) { + partitionParam.addToPartitionColumns(partCol.getName()); + } + + int partColNum = partitionInfo.getPartitionColumns().size(); + + TOlapTablePartition fakePartition = new TOlapTablePartition(); + fakePartition.setId(0); + // set partition keys + setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum); + + for (Long indexId : table.getIndexIdToMeta().keySet()) { + fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L))); + fakePartition.setNumBuckets(1); + } + fakePartition.setIsMutable(true); + + DistributionInfo distInfo = table.getDefaultDistributionInfo(); + partitionParam.setDistributedColumns(getDistColumns(distInfo)); + partitionParam.addToPartitions(fakePartition); + + ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs(); + if (exprSource != null && analyzer != null) { + Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext()); + tupleDescriptor.setTable(table); + funcAnalyzer.registerTupleDescriptor(tupleDescriptor); + // we must clone the exprs. otherwise analyze will influence the origin exprs. + ArrayList<Expr> exprs = new ArrayList<Expr>(); + for (Expr e : exprSource) { + exprs.add(e.clone()); + } + for (Expr e : exprs) { + e.reset(); + e.analyze(funcAnalyzer); + } + partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); + } + + return partitionParam; + } + public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); + PartitionInfo partitionInfo = table.getPartitionInfo(); + boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); + PartitionType partType = table.getPartitionInfo().getType(); partitionParam.setDbId(dbId); partitionParam.setTableId(table.getId()); partitionParam.setVersion(0); + partitionParam.setPartitionType(partType.toThrift()); + + // create shadow partition for empty auto partition table. only use in this load. + if (enableAutomaticPartition && partitionIds.isEmpty()) { + return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType); + } - PartitionType partType = table.getPartitionInfo().getType(); switch (partType) { case LIST: case RANGE: { - PartitionInfo partitionInfo = table.getPartitionInfo(); for (Column partCol : partitionInfo.getPartitionColumns()) { partitionParam.addToPartitionColumns(partCol.getName()); } @@ -405,7 +471,6 @@ public class OlapTableSink extends DataSink { } } } - boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); // for auto create partition by function expr, there is no any partition firstly, // But this is required in thrift struct. if (enableAutomaticPartition && partitionIds.isEmpty()) { @@ -474,7 +539,6 @@ public class OlapTableSink extends DataSink { throw new UserException("unsupported partition for OlapTable, partition=" + partType); } } - partitionParam.setPartitionType(partType.toThrift()); return partitionParam; } @@ -515,7 +579,46 @@ public class OlapTableSink extends DataSink { } } + public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) throws UserException { + TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); + TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); + + final long fakeTabletId = 0; + SystemInfoService clusterInfo = Env.getCurrentSystemInfo(); + List<Long> aliveBe = clusterInfo.getAllBackendIds(true); + if (aliveBe.isEmpty()) { + throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster"); + } + for (int i = 0; i < table.getIndexNumber(); i++) { + // only one fake tablet here + if (singleReplicaLoad) { + Long[] nodes = aliveBe.toArray(new Long[0]); + List<Long> slaveBe = aliveBe; + + Random random = new SecureRandom(); + int masterNode = random.nextInt(nodes.length); + locationParam.addToTablets(new TTabletLocation(fakeTabletId, + Arrays.asList(nodes[masterNode]))); + + slaveBe.remove(masterNode); + slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId, + slaveBe)); + } else { + locationParam.addToTablets(new TTabletLocation(fakeTabletId, + Arrays.asList(aliveBe.get(0)))); // just one fake location is enough + + LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0)); + } + } + + return Arrays.asList(locationParam, slaveLocationParam); + } + public List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException { + if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) { + return createDummyLocation(table); + } + TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 52975d3aa2c..f1003e0a25f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3379,7 +3379,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!Env.getCurrentEnv().isMaster()) { errorStatus.setStatusCode(TStatusCode.NOT_MASTER); errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); - LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG); + LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG); return result; } @@ -3414,10 +3414,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { List<String> allReqPartNames; // all request partitions try { taskLock.lock(); - // we dont lock the table. other thread in this txn will be controled by - // taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new - // partition directly. + // we dont lock the table. other thread in this txn will be controled by taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new partition directly. // if not by this txn, just let it fail naturally is ok. List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. @@ -3427,8 +3425,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced .mapToObj(partitionIds::get) .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by - // others). + // from here we ONLY deal the pending partitions. not include the dealed(by others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); @@ -3439,8 +3436,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { overwriteManager.registerTaskInGroup(taskGroupId, taskId); InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and - // record them. + // now temp partitions are bumped up and use new names. we get their ids and record them. List<Long> newPartitionIds = new ArrayList<Long>(); for (String newPartName : pendingPartitionNames) { newPartitionIds.add(olapTable.getPartition(newPartName).getId()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 601d722d54c..3f8db4b22cc 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -159,6 +159,8 @@ message PTabletWriterAddBlockRequest { optional bool write_single_replica = 12 [default = false]; map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13; optional bool is_single_tablet_block = 14 [default = false]; + // for auto-partition first stage close, we should hang. + optional bool hang_wait = 15 [default = false]; }; message PSlaveTabletNodes { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 9d147c84174..6c66c56e87c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -212,6 +212,7 @@ struct TOlapTablePartitionParam { // insert overwrite partition(*) 11: optional bool enable_auto_detect_overwrite 12: optional i64 overwrite_group_id + 13: optional bool partitions_is_fake = false } struct TOlapTableIndex { diff --git a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out new file mode 100644 index 00000000000..4ee136aef2b --- /dev/null +++ b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +2 + diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy index bf9b24c2634..1102ba8f393 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy @@ -140,8 +140,7 @@ suite("test_auto_range_partition") { logger.info("${result2}") assertEquals(result2.size(), 2) - // partition expr extraction - + // insert into select have multi sender in load sql " drop table if exists isit " sql " drop table if exists isit_src " sql """ diff --git a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy index 4f9b7a365b4..8d43d90ff15 100644 --- a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy +++ b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy @@ -19,7 +19,7 @@ import groovy.io.FileType import java.nio.file.Files import java.nio.file.Paths -suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully +suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully``` // get doris-db from s3 def dirPath = context.file.parent def fatherPath = context.file.parentFile.parentFile.getPath() diff --git a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy new file mode 100644 index 00000000000..c9f2f04aab3 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("two_instance_correctness") { + + // finish time of instances have diff + sql "DROP TABLE IF EXISTS two_bkt;" + sql """ + create table two_bkt( + k0 date not null + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 2 + properties("replication_num" = "1"); + """ + + sql """ insert into two_bkt values ("2012-12-11"); """ + sql """ insert into two_bkt select "2020-12-12" from numbers("number" = "20000"); """ + + sql " DROP TABLE IF EXISTS two_bkt_dest; " + sql """ + create table two_bkt_dest( + k0 date not null + ) + AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) () + DISTRIBUTED BY HASH(`k0`) BUCKETS 10 + properties("replication_num" = "1"); + """ + sql " insert into two_bkt_dest select * from two_bkt; " + + qt_sql " select count(distinct k0) from two_bkt_dest; " +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org