This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2d7eca8875 [fix](auto-partition) fix auto partition load lost data in 
multi sender (#35287)
e2d7eca8875 is described below

commit e2d7eca88754a4c06f0c03bf5d4da68cdce0369d
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Wed May 29 14:24:11 2024 +0800

    [fix](auto-partition) fix auto partition load lost data in multi sender 
(#35287)
    
    Change `use_cnt` mechanism for incremental (auto partition) channels and
    streams, it's now dynamically counted.
    Use `close_wait()` of regular partitions as a synchronize point to make
    sure all sinks are in close phase before closing any incremental (auto
    partition) channels and streams.
    Add dummy (fake) partition and tablet if there is no regular partition
    in the auto partition table.
    
    Replace #34740
    
    Co-authored-by: zhaochangle <zhaochan...@selectdb.com>
---
 be/src/cloud/cloud_tablets_channel.cpp             |   6 +-
 be/src/exec/tablet_info.cpp                        |  17 +--
 be/src/runtime/load_channel.cpp                    |  25 ++++-
 be/src/runtime/load_channel.h                      |   9 +-
 be/src/runtime/load_channel_mgr.cpp                |   8 --
 be/src/runtime/load_stream.cpp                     |   2 +-
 be/src/runtime/load_stream.h                       |   4 +
 be/src/runtime/tablets_channel.cpp                 |  43 ++++++-
 be/src/runtime/tablets_channel.h                   |   9 +-
 be/src/vec/sink/load_stream_map_pool.cpp           |  11 +-
 be/src/vec/sink/load_stream_map_pool.h             |   4 +-
 be/src/vec/sink/load_stream_stub.cpp               |  13 ++-
 be/src/vec/sink/load_stream_stub.h                 |  10 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          | 123 +++++++++++++++------
 be/src/vec/sink/writer/vtablet_writer.h            |  67 +++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  60 ++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   2 +
 .../apache/doris/catalog/ListPartitionItem.java    |   2 +-
 .../org/apache/doris/catalog/PartitionKey.java     |   7 ++
 .../apache/doris/catalog/RangePartitionItem.java   |   6 +-
 .../apache/doris/datasource/InternalCatalog.java   |   4 +-
 .../org/apache/doris/planner/OlapTableSink.java    | 111 ++++++++++++++++++-
 .../apache/doris/service/FrontendServiceImpl.java  |  14 +--
 gensrc/proto/internal_service.proto                |   2 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 .../sql/two_instance_correctness.out               |   4 +
 .../test_auto_range_partition.groovy               |   3 +-
 .../auto_partition/sql/multi_thread_load.groovy    |   2 +-
 .../sql/two_instance_correctness.groovy            |  45 ++++++++
 29 files changed, 467 insertions(+), 147 deletions(-)

diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index 046916aa9a0..e063ab68116 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -103,8 +103,6 @@ Status CloudTabletsChannel::close(LoadChannel* parent, 
const PTabletWriterAddBlo
         return _close_status;
     }
 
-    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id
-              << ", backend id: " << req.backend_id();
     for (auto pid : req.partition_ids()) {
         _partition_ids.emplace(pid);
     }
@@ -113,6 +111,10 @@ Status CloudTabletsChannel::close(LoadChannel* parent, 
const PTabletWriterAddBlo
     _num_remaining_senders--;
     *finished = (_num_remaining_senders == 0);
 
+    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id
+              << ", backend id: " << req.backend_id()
+              << " remaining sender: " << _num_remaining_senders;
+
     if (!*finished) {
         return Status::OK();
     }
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 62ff0b2fcce..e32e9c9efcf 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() {
     // for both auto/non-auto partition table.
     _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
 
-    // initial partitions
+    // initial partitions. if meet dummy partitions only for open BE nodes, 
not generate key of them for finding
     for (const auto& t_part : _t_param.partitions) {
         VOlapTablePartition* part = nullptr;
         RETURN_IF_ERROR(generate_partition_from(t_part, part));
         _partitions.emplace_back(part);
-        if (_is_in_partition) {
-            for (auto& in_key : part->in_keys) {
-                _partitions_map->emplace(std::tuple {in_key.first, 
in_key.second, false}, part);
+
+        if (!_t_param.partitions_is_fake) {
+            if (_is_in_partition) {
+                for (auto& in_key : part->in_keys) {
+                    _partitions_map->emplace(std::tuple {in_key.first, 
in_key.second, false}, part);
+                }
+            } else {
+                _partitions_map->emplace(
+                        std::tuple {part->end_key.first, part->end_key.second, 
false}, part);
             }
-        } else {
-            _partitions_map->emplace(std::tuple {part->end_key.first, 
part->end_key.second, false},
-                                     part);
         }
     }
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index b6f8a6eeff3..ab7d6559e2c 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -20,9 +20,9 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
 
-#include "bvar/bvar.h"
 #include "cloud/cloud_tablets_channel.h"
 #include "cloud/config.h"
+#include "common/logging.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -35,11 +35,11 @@ namespace doris {
 bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
 
 LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
-                         const std::string& sender_ip, int64_t backend_id, 
bool enable_profile)
+                         std::string sender_ip, int64_t backend_id, bool 
enable_profile)
         : _load_id(load_id),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
-          _sender_ip(sender_ip),
+          _sender_ip(std::move(sender_ip)),
           _backend_id(backend_id),
           _enable_profile(enable_profile) {
     std::shared_ptr<QueryContext> query_context = nullptr;
@@ -176,6 +176,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     }
 
     // 3. handle eos
+    // if channel is incremental, maybe hang on close until all close request 
arrived.
     if (request.has_eos() && request.eos()) {
         st = _handle_eos(channel.get(), request, response);
         _report_profile(response);
@@ -197,6 +198,23 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* 
channel,
     auto index_id = request.index_id();
 
     RETURN_IF_ERROR(channel->close(this, request, response, &finished));
+
+    // for init node, we close waiting(hang on) all close request and let them 
return together.
+    if (request.has_hang_wait() && request.hang_wait()) {
+        DCHECK(!channel->is_incremental_channel());
+        VLOG_TRACE << "reciever close waiting!" << request.sender_id();
+        int count = 0;
+        while (!channel->is_finished()) {
+            bthread_usleep(1000);
+            count++;
+        }
+        // now maybe finished or cancelled.
+        VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
+        if (count >= 1000 * _timeout_s) { // maybe 
config::streaming_load_rpc_max_alive_time_sec
+            return Status::InternalError("Tablets channel didn't wait all 
close");
+        }
+    }
+
     if (finished) {
         std::lock_guard<std::mutex> l(_lock);
         {
@@ -206,6 +224,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
                     std::make_pair(channel->total_received_rows(), 
channel->num_rows_filtered())));
             _tablets_channels.erase(index_id);
         }
+        VLOG_NOTICE << "load " << _load_id.to_string() << " closed 
tablets_channel " << index_id;
         _finished_channel_ids.emplace(index_id);
     }
     return Status::OK();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4a437e51907..98a8d7c9f81 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,10 +17,7 @@
 
 #pragma once
 
-#include <algorithm>
 #include <atomic>
-#include <functional>
-#include <map>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -28,15 +25,11 @@
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
-#include <vector>
 
 #include "common/status.h"
-#include "olap/memtable_memory_limiter.h"
-#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
-#include "util/thrift_util.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -52,7 +45,7 @@ class BaseTabletsChannel;
 class LoadChannel {
 public:
     LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
-                const std::string& sender_ip, int64_t backend_id, bool 
enable_profile);
+                std::string sender_ip, int64_t backend_id, bool 
enable_profile);
     ~LoadChannel();
 
     // open a new load channel if not exist
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 53063d90673..d31ce1d9a7e 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -24,25 +24,17 @@
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <ctime>
-#include <functional>
-#include <map>
 #include <memory>
 #include <ostream>
-#include <queue>
 #include <string>
-#include <tuple>
 #include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_channel.h"
-#include "runtime/memory/mem_tracker.h"
 #include "util/doris_metrics.h"
-#include "util/mem_info.h"
 #include "util/metrics.h"
-#include "util/perf_counters.h"
-#include "util/pretty_printer.h"
 #include "util/thread.h"
 
 namespace doris {
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 30122e4a587..aa095432072 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -362,7 +362,7 @@ LoadStream::~LoadStream() {
 Status LoadStream::init(const POpenLoadStreamRequest* request) {
     _txn_id = request->txn_id();
     _total_streams = request->total_streams();
-    DCHECK(_total_streams > 0) << "total streams should be greator than 0";
+    _is_incremental = (_total_streams == 0);
 
     _schema = std::make_shared<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request->schema()));
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index c61a2d163de..b2635698379 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -117,6 +117,9 @@ public:
     void add_source(int64_t src_id) {
         std::lock_guard lock_guard(_lock);
         _open_streams[src_id]++;
+        if (_is_incremental) {
+            _total_streams++;
+        }
     }
 
     Status close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
@@ -167,6 +170,7 @@ private:
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
     LoadStreamMgr* _load_stream_mgr = nullptr;
     QueryThreadContext _query_thread_context;
+    bool _is_incremental = false;
 };
 
 using LoadStreamPtr = std::unique_ptr<LoadStream>;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index b3d2e1a6329..a1a8ec1a996 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -142,9 +142,29 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
     RETURN_IF_ERROR(_schema->init(request.schema()));
     _tuple_desc = _schema->tuple_desc();
 
-    _num_remaining_senders = request.num_senders();
-    _next_seqs.resize(_num_remaining_senders, 0);
-    _closed_senders.Reset(_num_remaining_senders);
+    int max_sender = request.num_senders();
+    /*
+     * a tablets channel in reciever is related to a bulk of VNodeChannel of 
sender. each instance one or none.
+     * there are two possibilities:
+     *  1. there's partitions originally broadcasted by FE. so all 
sender(instance) know it at start. and open() will be 
+     *     called directly, not by incremental_open(). and after _state 
changes to kOpened. _open_by_incremental will never 
+     *     be true. in this case, _num_remaining_senders will keep same with 
senders number. when all sender sent close rpc,
+     *     the tablets channel will close. and if for auto partition table, 
these channel's closing will hang on reciever and
+     *     return together to avoid close-then-incremental-open problem.
+     *  2. this tablets channel is opened by incremental_open of sender's sink 
node. so only this sender will know this partition
+     *     (this TabletsChannel) at that time. and we are not sure how many 
sender will know in the end. it depends on data
+     *     distribution. in this situation open() is called by 
incremental_open() at first time. so _open_by_incremental is true.
+     *     then _num_remaining_senders will not be set here. but inc every 
time when incremental_open() called. so it's dynamic
+     *     and also need same number of senders' close to close. but will not 
hang.
+     */
+    if (_open_by_incremental) {
+        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
+    } else {
+        _num_remaining_senders = max_sender;
+    }
+    // just use max_sender no matter incremental or not cuz we dont know how 
many senders will open.
+    _next_seqs.resize(max_sender, 0);
+    _closed_senders.Reset(max_sender);
 
     RETURN_IF_ERROR(_open_all_writers(request));
 
@@ -154,10 +174,19 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
 
 Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& 
params) {
     SCOPED_TIMER(_incremental_open_timer);
-    if (_state == kInitialized) { // haven't opened
+
+    // current node first opened by incremental open
+    if (_state == kInitialized) {
+        _open_by_incremental = true;
         RETURN_IF_ERROR(open(params));
     }
+
     std::lock_guard<std::mutex> l(_lock);
+
+    if (_open_by_incremental) {
+        _num_remaining_senders++;
+    }
+
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
     for (const auto& index : _schema->indexes()) {
@@ -231,8 +260,7 @@ Status TabletsChannel::close(LoadChannel* parent, const 
PTabletWriterAddBlockReq
         *finished = (_num_remaining_senders == 0);
         return _close_status;
     }
-    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id
-              << ", backend id: " << backend_id;
+
     for (auto pid : partition_ids) {
         _partition_ids.emplace(pid);
     }
@@ -240,6 +268,9 @@ Status TabletsChannel::close(LoadChannel* parent, const 
PTabletWriterAddBlockReq
     _num_remaining_senders--;
     *finished = (_num_remaining_senders == 0);
 
+    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id
+              << ", backend id: " << backend_id << " remaining sender: " << 
_num_remaining_senders;
+
     if (!*finished) {
         return Status::OK();
     }
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 88061e30533..978bbb155dd 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -21,8 +21,6 @@
 
 #include <atomic>
 #include <cstdint>
-#include <functional>
-#include <map>
 #include <mutex>
 #include <ostream>
 #include <shared_mutex>
@@ -116,6 +114,11 @@ public:
 
     size_t num_rows_filtered() const { return _num_rows_filtered; }
 
+    // means this tablets in this BE is incremental opened partitions.
+    bool is_incremental_channel() const { return _open_by_incremental; }
+
+    bool is_finished() const { return _state == kFinished; }
+
 protected:
     Status _write_block_data(const PTabletWriterAddBlockRequest& request, 
int64_t cur_seq,
                              std::unordered_map<int64_t, 
std::vector<uint32_t>>& tablet_to_rowidxs,
@@ -158,8 +161,8 @@ protected:
     int64_t _txn_id = -1;
     int64_t _index_id = -1;
     std::shared_ptr<OlapTableSchemaParam> _schema;
-
     TupleDescriptor* _tuple_desc = nullptr;
+    bool _open_by_incremental = false;
 
     // next sequence we expect
     int _num_remaining_senders = 0;
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index fdcfe190dbf..7a3072ade6e 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
 
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool 
incremental) {
     std::lock_guard<std::mutex> lock(_mutex);
     std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
     if (streams != nullptr) {
@@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t 
dst_id) {
     streams = std::make_shared<Streams>();
     for (int i = 0; i < _num_streams; i++) {
         streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
_tablet_schema_for_index,
-                                                 
_enable_unique_mow_for_index));
+                                                 _enable_unique_mow_for_index, 
incremental));
     }
     _streams_for_node[dst_id] = streams;
     return streams;
@@ -101,10 +101,13 @@ bool LoadStreamMap::release() {
     return false;
 }
 
-Status LoadStreamMap::close_load() {
-    return for_each_st([this](int64_t dst_id, const Streams& streams) -> 
Status {
+Status LoadStreamMap::close_load(bool incremental) {
+    return for_each_st([this, incremental](int64_t dst_id, const Streams& 
streams) -> Status {
         const auto& tablets = _tablets_to_commit[dst_id];
         for (auto& stream : streams) {
+            if (stream->is_incremental() != incremental) {
+                continue;
+            }
             RETURN_IF_ERROR(stream->close_load(tablets));
         }
         return Status::OK();
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index aad12dba2aa..d0f72ab7e00 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -78,7 +78,7 @@ public:
     LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
                   LoadStreamMapPool* pool);
 
-    std::shared_ptr<Streams> get_or_create(int64_t dst_id);
+    std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = 
false);
 
     std::shared_ptr<Streams> at(int64_t dst_id);
 
@@ -95,7 +95,7 @@ public:
 
     // send CLOSE_LOAD to all streams, return ERROR if any.
     // only call this method after release() returns true.
-    Status close_load();
+    Status close_load(bool incremental);
 
 private:
     const UniqueId _load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 92670c1c930..caebb381db6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
                                std::shared_ptr<IndexToTabletSchema> schema_map,
-                               std::shared_ptr<IndexToEnableMoW> mow_map)
+                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental)
         : _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(schema_map),
-          _enable_unique_mow_for_index(mow_map) {};
+          _enable_unique_mow_for_index(mow_map),
+          _is_incremental(incremental) {};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_init.load() && !_is_closed.load()) {
@@ -168,7 +169,13 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     request.set_src_id(_src_id);
     request.set_txn_id(txn_id);
     request.set_enable_profile(enable_profile);
-    request.set_total_streams(total_streams);
+    if (_is_incremental) {
+        request.set_total_streams(0);
+    } else if (total_streams > 0) {
+        request.set_total_streams(total_streams);
+    } else {
+        return Status::InternalError("total_streams should be greator than 0");
+    }
     request.set_idle_timeout_ms(idle_timeout_ms);
     schema.to_protobuf(request.mutable_schema());
     for (auto& tablet : tablets_for_schema) {
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 1f0d2e459d3..1bf0fac4e38 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -111,12 +111,12 @@ public:
     // construct new stub
     LoadStreamStub(PUniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map);
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false);
 
     LoadStreamStub(UniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map)
-            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) 
{};
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false)
+            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental) {};
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -195,6 +195,8 @@ public:
 
     int64_t dst_id() const { return _dst_id; }
 
+    bool is_incremental() const { return _is_incremental; }
+
     friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& 
stub);
 
     std::string to_string();
@@ -255,6 +257,8 @@ protected:
     bthread::Mutex _failed_tablets_mutex;
     std::vector<int64_t> _success_tablets;
     std::unordered_map<int64_t, Status> _failed_tablets;
+
+    bool _is_incremental = false;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 621a9ad1131..7ae091eda7f 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -35,11 +35,9 @@
 #include <sys/param.h>
 
 #include <algorithm>
-#include <exception>
 #include <initializer_list>
 #include <memory>
 #include <mutex>
-#include <ranges>
 #include <sstream>
 #include <string>
 #include <unordered_map>
@@ -47,26 +45,22 @@
 #include <vector>
 
 #include "cloud/config.h"
+#include "olap/inverted_index_parser.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vdatetime_value.h"
-#include "vec/sink/volap_table_sink.h"
 #include "vec/sink/vrow_distribution.h"
 
 #ifdef DEBUG
 #include <unordered_set>
 #endif
 
-#include "bvar/bvar.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "exec/tablet_info.h"
-#include "runtime/client_cache.h"
-#include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
@@ -86,11 +80,8 @@
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
-#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
-#include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -110,7 +101,8 @@ bvar::Adder<int64_t> g_sink_write_rows;
 bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_throughput_row",
                                                                    
&g_sink_write_rows, 60);
 
-Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets) {
+Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets,
+                          bool incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
     for (const auto& tablet : tablets) {
         // First find the location BEs of this tablet
@@ -128,8 +120,15 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
                 // NodeChannel is not added to the _parent->_pool.
                 // Because the deconstruction of NodeChannel may take a long 
time to wait rpc finish.
                 // but the ObjectPool will hold a spin lock to delete objects.
-                channel = std::make_shared<VNodeChannel>(_parent, this, 
replica_node_id);
+                channel =
+                        std::make_shared<VNodeChannel>(_parent, this, 
replica_node_id, incremental);
                 _node_channels.emplace(replica_node_id, channel);
+                // incremental opened new node. when close we have use 
two-stage close.
+                if (incremental) {
+                    _has_inc_node = true;
+                }
+                LOG(INFO) << "init new node for instance " << 
_parent->_sender_id
+                          << ", incremantal:" << incremental;
             } else {
                 channel = it->second;
             }
@@ -359,22 +358,23 @@ Status VNodeChannel::init(RuntimeState* state) {
     // add block closure
     // Has to using value to capture _task_exec_ctx because tablet writer may 
destroyed during callback.
     _send_block_callback = 
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
-    _send_block_callback->addFailedHandler([&, task_exec_ctx = 
_task_exec_ctx](bool is_last_rpc) {
-        auto ctx_lock = task_exec_ctx.lock();
-        if (ctx_lock == nullptr) {
-            return;
-        }
-        _add_block_failed_callback(is_last_rpc);
-    });
+    _send_block_callback->addFailedHandler(
+            [&, task_exec_ctx = _task_exec_ctx](const 
WriteBlockCallbackContext& ctx) {
+                std::shared_ptr<TaskExecutionContext> ctx_lock = 
task_exec_ctx.lock();
+                if (ctx_lock == nullptr) {
+                    return;
+                }
+                _add_block_failed_callback(ctx);
+            });
 
     _send_block_callback->addSuccessHandler(
             [&, task_exec_ctx = _task_exec_ctx](const 
PTabletWriterAddBlockResult& result,
-                                                bool is_last_rpc) {
-                auto ctx_lock = task_exec_ctx.lock();
+                                                const 
WriteBlockCallbackContext& ctx) {
+                std::shared_ptr<TaskExecutionContext> ctx_lock = 
task_exec_ctx.lock();
                 if (ctx_lock == nullptr) {
                     return;
                 }
-                _add_block_success_callback(result, is_last_rpc);
+                _add_block_success_callback(result, ctx);
             });
 
     _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
@@ -681,6 +681,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
         }
 
         // eos request must be the last request-> it's a signal makeing 
callback function to set _add_batch_finished true.
+        // end_mark makes is_last_rpc true when rpc finished and call 
callbacks.
         _send_block_callback->end_mark();
         _send_finished = true;
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -730,7 +731,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
 }
 
 void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult& result,
-                                               bool is_last_rpc) {
+                                               const 
WriteBlockCallbackContext& ctx) {
     std::lock_guard<std::mutex> l(this->_closed_lock);
     if (this->_is_closed) {
         // if the node channel is closed, no need to call the following logic,
@@ -748,7 +749,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
         Status st = _index_channel->check_intolerable_failure();
         if (!st.ok()) {
             _cancel_with_msg(st.to_string());
-        } else if (is_last_rpc) {
+        } else if (ctx._is_last_rpc) {
             for (const auto& tablet : result.tablet_vec()) {
                 TTabletCommitInfo commit_info;
                 commit_info.tabletId = tablet.tablet_id();
@@ -806,7 +807,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
     }
 }
 
-void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) {
+void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& 
ctx) {
     std::lock_guard<std::mutex> l(this->_closed_lock);
     if (this->_is_closed) {
         // if the node channel is closed, no need to call `mark_as_failed`,
@@ -823,7 +824,7 @@ void VNodeChannel::_add_block_failed_callback(bool 
is_last_rpc) {
     Status st = _index_channel->check_intolerable_failure();
     if (!st.ok()) {
         _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
st.to_string()));
-    } else if (is_last_rpc) {
+    } else if (ctx._is_last_rpc) {
         // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
         // will be blocked.
         _add_batches_finished = true;
@@ -896,12 +897,14 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         }
     }
 
-    // waiting for finished, it may take a long time, so we couldn't set a 
timeout
+    // Waiting for finished until _add_batches_finished changed by rpc's 
finished callback.
+    // it may take a long time, so we couldn't set a timeout
     // For pipeline engine, the close is called in async writer's process 
block method,
     // so that it will not block pipeline thread.
     while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
         bthread_usleep(1000);
     }
+    VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
     _close_time_ms = UnixMillis() - _close_time_ms;
 
     if (_cancelled || state->is_cancelled()) {
@@ -929,17 +932,18 @@ void VNodeChannel::_close_check() {
     CHECK(_cur_mutable_block == nullptr) << name();
 }
 
-void VNodeChannel::mark_close() {
+void VNodeChannel::mark_close(bool hang_wait) {
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
         return;
     }
 
     _cur_add_block_request->set_eos(true);
+    _cur_add_block_request->set_hang_wait(hang_wait);
     {
         std::lock_guard<std::mutex> l(_pending_batches_lock);
         if (!_cur_mutable_block) [[unlikely]] {
-            // add a dummy block
+            // never had a block arrived. add a dummy block
             _cur_mutable_block = vectorized::MutableBlock::create_unique();
         }
         auto tmp_add_block_request =
@@ -1179,7 +1183,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         return Status::InternalError("unknown destination tuple descriptor");
     }
 
-    if (_vec_output_expr_ctxs.size() > 0 &&
+    if (!_vec_output_expr_ctxs.empty() &&
         _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
         LOG(WARNING) << "output tuple slot num should be equal to num of 
output exprs, "
                      << "output_tuple_slot_num " << 
_output_tuple_desc->slots().size()
@@ -1290,7 +1294,7 @@ Status VTabletWriter::_incremental_open_node_channel(
         // update and reinit for existing channels.
         std::shared_ptr<IndexChannel> channel = 
_index_id_to_channel[index->index_id];
         DCHECK(channel != nullptr);
-        RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it
+        RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets 
into it
     }
 
     fmt::memory_buffer buf;
@@ -1385,14 +1389,63 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
 
     _try_close = true; // will stop periodic thread
     if (status.ok()) {
+        // BE id -> add_batch method counter
+        std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
+
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
         SCOPED_TIMER(_profile->total_time_counter());
-        {
-            for (const auto& index_channel : _channels) {
+        for (const auto& index_channel : _channels) {
+            // two-step mark close. first we send close_origin to recievers to 
close all originly exist TabletsChannel.
+            // when they all closed, we are sure all Writer of instances 
called _do_try_close. that means no new channel
+            // will be opened. the refcount of recievers will be monotonically 
decreasing. then we are safe to close all
+            // our channels.
+            if (index_channel->has_incremental_node_channel()) {
+                if (!status.ok()) {
+                    break;
+                }
+                VLOG_TRACE << _sender_id << " first stage close start";
+                index_channel->for_init_node_channel(
+                        [&index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            ch->mark_close(true);
+                            if (ch->is_cancelled()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                            }
+                        });
                 if (!status.ok()) {
                     break;
                 }
+                index_channel->for_init_node_channel(
+                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            auto s = ch->close_wait(_state);
+                            if (!s.ok()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, s.to_string(), index_channel, 
ch);
+                            }
+                        });
+                if (!status.ok()) {
+                    break;
+                }
+                index_channel->for_inc_node_channel(
+                        [&index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // only first try close, all node channels will 
mark_close()
+                            ch->mark_close();
+                            if (ch->is_cancelled()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                            }
+                        });
+            } else { // not has_incremental_node_channel
                 index_channel->for_each_node_channel(
                         [&index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
                             if (!status.ok() || ch->is_closed()) {
@@ -1405,8 +1458,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                                         status, ch->get_cancel_msg(), 
index_channel, ch);
                             }
                         });
-            } // end for index channels
-        }
+            }
+        } // end for index channels
     }
 
     if (!status.ok()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 8075487abac..91aa9392e53 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -36,13 +36,11 @@
 #include <cstddef>
 #include <cstdint>
 #include <functional>
-#include <initializer_list>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <ostream>
 #include <queue>
-#include <set>
 #include <sstream>
 #include <string>
 #include <thread>
@@ -55,23 +53,17 @@
 #include "common/status.h"
 #include "exec/data_sink.h"
 #include "exec/tablet_info.h"
-#include "gutil/ref_counted.h"
-#include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/thread_context.h"
-#include "runtime/types.h"
-#include "util/countdown_latch.h"
 #include "util/ref_count_closure.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 #include "util/stopwatch.hpp"
 #include "vec/columns/column.h"
-#include "vec/common/allocator.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vfile_format_transformer.h"
 #include "vec/sink/vrow_distribution.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
@@ -114,6 +106,10 @@ struct AddBatchCounter {
     }
 };
 
+struct WriteBlockCallbackContext {
+    std::atomic<bool> _is_last_rpc {false};
+};
+
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
 // Delete this point is safe, don't worry about RPC callback will run after 
WriteBlockCallback deleted.
@@ -127,8 +123,13 @@ public:
     WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
     ~WriteBlockCallback() override = default;
 
-    void addFailedHandler(const std::function<void(bool)>& fn) { 
failed_handler = fn; }
-    void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { 
success_handler = fn; }
+    void addFailedHandler(const std::function<void(const 
WriteBlockCallbackContext&)>& fn) {
+        failed_handler = fn;
+    }
+    void addSuccessHandler(
+            const std::function<void(const T&, const 
WriteBlockCallbackContext&)>& fn) {
+        success_handler = fn;
+    }
 
     void join() override {
         // We rely on in_flight to assure one rpc is running,
@@ -165,8 +166,8 @@ public:
     bool is_packet_in_flight() { return _packet_in_flight; }
 
     void end_mark() {
-        DCHECK(_is_last_rpc == false);
-        _is_last_rpc = true;
+        DCHECK(_ctx._is_last_rpc == false);
+        _ctx._is_last_rpc = true;
     }
 
     void call() override {
@@ -175,9 +176,9 @@ public:
             LOG(WARNING) << "failed to send brpc batch, error="
                          << 
berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
                          << ", error_text=" << 
::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
-            failed_handler(_is_last_rpc);
+            failed_handler(_ctx);
         } else {
-            success_handler(*(::doris::DummyBrpcCallback<T>::response_), 
_is_last_rpc);
+            success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
         }
         clear_in_flight();
     }
@@ -185,9 +186,9 @@ public:
 private:
     brpc::CallId cid;
     std::atomic<bool> _packet_in_flight {false};
-    std::atomic<bool> _is_last_rpc {false};
-    std::function<void(bool)> failed_handler;
-    std::function<void(const T&, bool)> success_handler;
+    WriteBlockCallbackContext _ctx;
+    std::function<void(const WriteBlockCallbackContext&)> failed_handler;
+    std::function<void(const T&, const WriteBlockCallbackContext&)> 
success_handler;
 };
 
 class IndexChannel;
@@ -258,7 +259,8 @@ public:
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
-    void mark_close();
+    // hang_wait = true will make reciever hang until all sender mark_closed.
+    void mark_close(bool hang_wait = false);
 
     bool is_closed() const { return _is_closed; }
     bool is_cancelled() const { return _cancelled; }
@@ -320,8 +322,9 @@ protected:
     void _close_check();
     void _cancel_with_msg(const std::string& msg);
 
-    void _add_block_success_callback(const PTabletWriterAddBlockResult& 
result, bool is_last_rpc);
-    void _add_block_failed_callback(bool is_last_rpc);
+    void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
+                                     const WriteBlockCallbackContext& ctx);
+    void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
 
     VTabletWriter* _parent = nullptr;
     IndexChannel* _index_channel = nullptr;
@@ -425,7 +428,8 @@ public:
     ~IndexChannel() = default;
 
     // allow to init multi times, for incremental open more tablets for one 
index(table)
-    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
+    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets,
+                bool incremental = false);
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
@@ -434,6 +438,26 @@ public:
         }
     }
 
+    void for_init_node_channel(
+            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
+        for (auto& it : _node_channels) {
+            if (!it.second->is_incremental()) {
+                func(it.second);
+            }
+        }
+    }
+
+    void for_inc_node_channel(
+            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
+        for (auto& it : _node_channels) {
+            if (it.second->is_incremental()) {
+                func(it.second);
+            }
+        }
+    }
+
+    bool has_incremental_node_channel() const { return _has_inc_node; }
+
     void mark_as_failed(const VNodeChannel* node_channel, const std::string& 
err,
                         int64_t tablet_id = -1);
     Status check_intolerable_failure();
@@ -492,6 +516,7 @@ private:
     std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> 
_channels_by_tablet;
+    bool _has_inc_node = false;
 
     // lock to protect _failed_channels and _failed_channels_msgs
     mutable doris::SpinLock _fail_lock;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 5e8b57ee029..3c9c581b49d 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
         }
     }
     for (int64_t dst_id : new_backends) {
-        auto streams = _load_stream_map->get_or_create(dst_id);
+        auto streams = _load_stream_map->get_or_create(dst_id, true);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
     }
     return Status::OK();
@@ -311,6 +311,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                     tablet.set_index_id(index.index_id);
                     tablet.set_tablet_id(tablet_id);
                     _tablets_for_node[node].emplace(tablet_id, tablet);
+                    constexpr int64_t DUMMY_TABLET_ID = 0;
+                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
+                        // ignore fake tablet for auto partition
+                        continue;
+                    }
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
@@ -549,32 +554,26 @@ Status VTabletWriterV2::close(Status exec_status) {
         LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
                   << ", load_id=" << print_id(_load_id);
 
-        // send CLOSE_LOAD on all streams if this is the last sink
+        // send CLOSE_LOAD on all non-incremental streams if this is the last 
sink
         if (is_last_sink) {
-            RETURN_IF_ERROR(_load_stream_map->close_load());
+            RETURN_IF_ERROR(_load_stream_map->close_load(false));
         }
 
-        // close_wait on all streams, even if this is not the last sink.
+        // close_wait on all non-incremental streams, even if this is not the 
last sink.
         // because some per-instance data structures are now shared among all 
sinks
         // due to sharing delta writers and load stream stubs.
-        {
-            SCOPED_TIMER(_close_load_timer);
-            RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t 
dst_id,
-                                                                 const 
Streams& streams) -> Status {
-                for (auto& stream : streams) {
-                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
-                    if (remain_ms <= 0) {
-                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
-                                     << print_id(_load_id);
-                        return Status::TimedOut("load timed out before close 
waiting");
-                    }
-                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
-                }
-                return Status::OK();
-            }));
+        RETURN_IF_ERROR(_close_wait(false));
+
+        // send CLOSE_LOAD on all incremental streams if this is the last sink.
+        // this must happen after all non-incremental streams are closed,
+        // so we can ensure all sinks are in close phase before closing 
incremental streams.
+        if (is_last_sink) {
+            RETURN_IF_ERROR(_load_stream_map->close_load(true));
         }
 
+        // close_wait on all incremental streams, even if this is not the last 
sink.
+        RETURN_IF_ERROR(_close_wait(true));
+
         // calculate and submit commit info
         if (is_last_sink) {
             DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
@@ -625,6 +624,27 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
+Status VTabletWriterV2::_close_wait(bool incremental) {
+    SCOPED_TIMER(_close_load_timer);
+    return _load_stream_map->for_each_st(
+            [this, incremental](int64_t dst_id, const Streams& streams) -> 
Status {
+                for (auto& stream : streams) {
+                    if (stream->is_incremental() != incremental) {
+                        continue;
+                    }
+                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
+                    if (remain_ms <= 0) {
+                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                     << print_id(_load_id);
+                        return Status::TimedOut("load timed out before close 
waiting");
+                    }
+                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+                }
+                return Status::OK();
+            });
+}
+
 void VTabletWriterV2::_calc_tablets_to_commit() {
     LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << 
", txn_id=" << _txn_id
               << ", sink_id=" << _sender_id;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index e3d31fb32b9..5a9890cdb49 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -147,6 +147,8 @@ private:
 
     void _calc_tablets_to_commit();
 
+    Status _close_wait(bool incremental);
+
     Status _cancel(Status status);
 
     std::shared_ptr<MemTracker> _mem_tracker;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index 1a4d188a0ca..dafdcdc49f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -35,7 +35,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class ListPartitionItem extends PartitionItem {
-    public static ListPartitionItem DUMMY_ITEM = new 
ListPartitionItem(Lists.newArrayList());
+    public static final ListPartitionItem DUMMY_ITEM = new 
ListPartitionItem(Lists.newArrayList());
 
     private final List<PartitionKey> partitionKeys;
     private boolean isDefaultPartition = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index 9912a9daff5..bd6706d737a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -90,6 +90,13 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         return partitionKey;
     }
 
+    public static PartitionKey createMaxPartitionKey() {
+        PartitionKey partitionKey = new PartitionKey();
+        partitionKey.keys.add(MaxLiteral.MAX_VALUE);
+        // type not set
+        return partitionKey;
+    }
+
     public static PartitionKey createPartitionKey(List<PartitionValue> keys, 
List<Column> columns)
             throws AnalysisException {
         PartitionKey partitionKey = new PartitionKey();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 56214aaa0ea..bb7ddabbaa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -30,10 +30,12 @@ import java.util.Optional;
 
 public class RangePartitionItem extends PartitionItem {
     private Range<PartitionKey> partitionKeyRange;
-    public static final Range<PartitionKey> DUMMY_ITEM;
+    public static final Range<PartitionKey> DUMMY_RANGE;
+    public static final RangePartitionItem DUMMY_ITEM;
 
     static {
-        DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey());
+        DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey());
+        DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), 
PartitionKey.createMaxPartitionKey()));
     }
 
     public RangePartitionItem(Range<PartitionKey> range) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index d1d0b7d4eff..d562fd62a1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1739,12 +1739,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else if (partitionInfo.getType() == PartitionType.LIST) {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_ITEM, 
partitionInfo.getItem(partitionId), dataProperty,
+                            RangePartitionItem.DUMMY_RANGE, 
partitionInfo.getItem(partitionId), dataProperty,
                             partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_ITEM, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
+                            RangePartitionItem.DUMMY_RANGE, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
                             partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 7cc316c350f..7e27dd8a606 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -349,18 +349,84 @@ public class OlapTableSink extends DataSink {
         return distColumns;
     }
 
+    private PartitionItem createDummyPartitionItem(PartitionType partType) 
throws UserException {
+        if (partType == PartitionType.LIST) {
+            return ListPartitionItem.DUMMY_ITEM;
+        } else if (partType == PartitionType.RANGE) {
+            return RangePartitionItem.DUMMY_ITEM;
+        } else {
+            throw new UserException("unsupported partition for OlapTable, 
partition=" + partType);
+        }
+    }
+
+    private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable 
table, Analyzer analyzer,
+            TOlapTablePartitionParam partitionParam, PartitionInfo 
partitionInfo, PartitionType partType)
+            throws UserException {
+        partitionParam.setEnableAutomaticPartition(true);
+        // these partitions only use in locations. not find partition.
+        partitionParam.setPartitionsIsFake(true);
+
+        // set columns
+        for (Column partCol : partitionInfo.getPartitionColumns()) {
+            partitionParam.addToPartitionColumns(partCol.getName());
+        }
+
+        int partColNum = partitionInfo.getPartitionColumns().size();
+
+        TOlapTablePartition fakePartition = new TOlapTablePartition();
+        fakePartition.setId(0);
+        // set partition keys
+        setPartitionKeys(fakePartition, createDummyPartitionItem(partType), 
partColNum);
+
+        for (Long indexId : table.getIndexIdToMeta().keySet()) {
+            fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, 
Arrays.asList(0L)));
+            fakePartition.setNumBuckets(1);
+        }
+        fakePartition.setIsMutable(true);
+
+        DistributionInfo distInfo = table.getDefaultDistributionInfo();
+        partitionParam.setDistributedColumns(getDistColumns(distInfo));
+        partitionParam.addToPartitions(fakePartition);
+
+        ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
+        if (exprSource != null && analyzer != null) {
+            Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), 
analyzer.getContext());
+            tupleDescriptor.setTable(table);
+            funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
+            // we must clone the exprs. otherwise analyze will influence the 
origin exprs.
+            ArrayList<Expr> exprs = new ArrayList<Expr>();
+            for (Expr e : exprSource) {
+                exprs.add(e.clone());
+            }
+            for (Expr e : exprs) {
+                e.reset();
+                e.analyze(funcAnalyzer);
+            }
+            
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
+        }
+
+        return partitionParam;
+    }
+
     public TOlapTablePartitionParam createPartition(long dbId, OlapTable 
table, Analyzer analyzer)
             throws UserException {
         TOlapTablePartitionParam partitionParam = new 
TOlapTablePartitionParam();
+        PartitionInfo partitionInfo = table.getPartitionInfo();
+        boolean enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
+        PartitionType partType = table.getPartitionInfo().getType();
         partitionParam.setDbId(dbId);
         partitionParam.setTableId(table.getId());
         partitionParam.setVersion(0);
+        partitionParam.setPartitionType(partType.toThrift());
+
+        // create shadow partition for empty auto partition table. only use in 
this load.
+        if (enableAutomaticPartition && partitionIds.isEmpty()) {
+            return createDummyPartition(dbId, table, analyzer, partitionParam, 
partitionInfo, partType);
+        }
 
-        PartitionType partType = table.getPartitionInfo().getType();
         switch (partType) {
             case LIST:
             case RANGE: {
-                PartitionInfo partitionInfo = table.getPartitionInfo();
                 for (Column partCol : partitionInfo.getPartitionColumns()) {
                     partitionParam.addToPartitionColumns(partCol.getName());
                 }
@@ -405,7 +471,6 @@ public class OlapTableSink extends DataSink {
                         }
                     }
                 }
-                boolean enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
                 // for auto create partition by function expr, there is no any 
partition firstly,
                 // But this is required in thrift struct.
                 if (enableAutomaticPartition && partitionIds.isEmpty()) {
@@ -474,7 +539,6 @@ public class OlapTableSink extends DataSink {
                 throw new UserException("unsupported partition for OlapTable, 
partition=" + partType);
             }
         }
-        partitionParam.setPartitionType(partType.toThrift());
         return partitionParam;
     }
 
@@ -515,7 +579,46 @@ public class OlapTableSink extends DataSink {
         }
     }
 
+    public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) 
throws UserException {
+        TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+        TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
+
+        final long fakeTabletId = 0;
+        SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
+        List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
+        if (aliveBe.isEmpty()) {
+            throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no 
available BE in cluster");
+        }
+        for (int i = 0; i < table.getIndexNumber(); i++) {
+            // only one fake tablet here
+            if (singleReplicaLoad) {
+                Long[] nodes = aliveBe.toArray(new Long[0]);
+                List<Long> slaveBe = aliveBe;
+
+                Random random = new SecureRandom();
+                int masterNode = random.nextInt(nodes.length);
+                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+                        Arrays.asList(nodes[masterNode])));
+
+                slaveBe.remove(masterNode);
+                slaveLocationParam.addToTablets(new 
TTabletLocation(fakeTabletId,
+                        slaveBe));
+            } else {
+                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+                        Arrays.asList(aliveBe.get(0)))); // just one fake 
location is enough
+
+                LOG.info("created dummy location tablet_id={}, be_id={}", 
fakeTabletId, aliveBe.get(0));
+            }
+        }
+
+        return Arrays.asList(locationParam, slaveLocationParam);
+    }
+
     public List<TOlapTableLocationParam> createLocation(OlapTable table) 
throws UserException {
+        if (table.getPartitionInfo().enableAutomaticPartition() && 
partitionIds.isEmpty()) {
+            return createDummyLocation(table);
+        }
+
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 52975d3aa2c..f1003e0a25f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3379,7 +3379,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (!Env.getCurrentEnv().isMaster()) {
             errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
             errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
-            LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
+            LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
             return result;
         }
 
@@ -3414,10 +3414,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<String> allReqPartNames; // all request partitions
         try {
             taskLock.lock();
-            // we dont lock the table. other thread in this txn will be 
controled by
-            // taskLock.
-            // if we have already replaced. dont do it again, but acquire the 
recorded new
-            // partition directly.
+            // we dont lock the table. other thread in this txn will be 
controled by taskLock.
+            // if we have already replaced. dont do it again, but acquire the 
recorded new partition directly.
             // if not by this txn, just let it fail naturally is ok.
             List<Long> replacedPartIds = 
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
             // here if replacedPartIds still have null. this will throw 
exception.
@@ -3427,8 +3425,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     .filter(i -> partitionIds.get(i) == 
replacedPartIds.get(i)) // equal means not replaced
                     .mapToObj(partitionIds::get)
                     .collect(Collectors.toList());
-            // from here we ONLY deal the pending partitions. not include the 
dealed(by
-            // others).
+            // from here we ONLY deal the pending partitions. not include the 
dealed(by others).
             if (!pendingPartitionIds.isEmpty()) {
                 // below two must have same order inner.
                 List<String> pendingPartitionNames = 
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3439,8 +3436,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 overwriteManager.registerTaskInGroup(taskGroupId, taskId);
                 InsertOverwriteUtil.addTempPartitions(olapTable, 
pendingPartitionNames, tempPartitionNames);
                 InsertOverwriteUtil.replacePartition(olapTable, 
pendingPartitionNames, tempPartitionNames);
-                // now temp partitions are bumped up and use new names. we get 
their ids and
-                // record them.
+                // now temp partitions are bumped up and use new names. we get 
their ids and record them.
                 List<Long> newPartitionIds = new ArrayList<Long>();
                 for (String newPartName : pendingPartitionNames) {
                     
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 601d722d54c..3f8db4b22cc 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -159,6 +159,8 @@ message PTabletWriterAddBlockRequest {
     optional bool write_single_replica = 12 [default = false];
     map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
     optional bool is_single_tablet_block = 14 [default = false];
+    // for auto-partition first stage close, we should hang.
+    optional bool hang_wait = 15 [default = false];
 };
 
 message PSlaveTabletNodes {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 9d147c84174..6c66c56e87c 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -212,6 +212,7 @@ struct TOlapTablePartitionParam {
     // insert overwrite partition(*)
     11: optional bool enable_auto_detect_overwrite
     12: optional i64 overwrite_group_id
+    13: optional bool partitions_is_fake = false
 }
 
 struct TOlapTableIndex {
diff --git 
a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
 
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
new file mode 100644
index 00000000000..4ee136aef2b
--- /dev/null
+++ 
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+2
+
diff --git 
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
 
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
index bf9b24c2634..1102ba8f393 100644
--- 
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
+++ 
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
@@ -140,8 +140,7 @@ suite("test_auto_range_partition") {
     logger.info("${result2}")
     assertEquals(result2.size(), 2)
 
-     // partition expr extraction
-
+    // insert into select have multi sender in load
     sql " drop table if exists isit "
     sql " drop table if exists isit_src "
     sql """
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
index 4f9b7a365b4..8d43d90ff15 100644
--- 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -19,7 +19,7 @@ import groovy.io.FileType
 import java.nio.file.Files
 import java.nio.file.Paths
 
-suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use 
resource fully
+suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use 
resource fully```
     // get doris-db from s3
     def dirPath = context.file.parent
     def fatherPath = context.file.parentFile.parentFile.getPath()
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
new file mode 100644
index 00000000000..c9f2f04aab3
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("two_instance_correctness") {
+
+    // finish time of instances have diff
+    sql "DROP TABLE IF EXISTS two_bkt;"
+    sql """
+        create table two_bkt(
+            k0 date not null
+        )
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 2
+        properties("replication_num" = "1");
+    """
+
+    sql """ insert into two_bkt values ("2012-12-11"); """
+    sql """ insert into two_bkt select "2020-12-12" from numbers("number" = 
"20000"); """
+
+    sql " DROP TABLE IF EXISTS two_bkt_dest; "
+    sql """
+        create table two_bkt_dest(
+            k0 date not null
+        )
+        AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) ()
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 10
+        properties("replication_num" = "1");
+    """
+    sql " insert into two_bkt_dest select * from two_bkt; "
+
+    qt_sql " select count(distinct k0) from two_bkt_dest; "
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to