This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch stream-load-vec
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/stream-load-vec by this push:
     new 687dc8a643 [Bug] Fix some bug in vec stream load (#9094)
687dc8a643 is described below

commit 687dc8a643c4b6e3d5a2a8e5f8e60848de094a68
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Apr 21 10:59:59 2022 +0800

    [Bug] Fix some bug in vec stream load (#9094)
    
    1. mem leak in agg data in memtable
    2. core dump in replace function
    3. core dump in DCHECK in tablet sink
    
    Co-authored-by: lihaopeng <lihaop...@baidu.com>
---
 be/src/exec/tablet_sink.h                          |  3 -
 be/src/olap/memtable.cpp                           | 10 +--
 be/src/olap/memtable.h                             |  8 +-
 .../aggregate_function_reader.cpp                  | 41 +++++----
 .../aggregate_function_reader.h                    |  7 +-
 .../aggregate_function_simple_factory.cpp          |  4 +-
 .../aggregate_function_window.cpp                  | 96 ++--------------------
 .../aggregate_function_window.h                    | 69 +++++++++++++---
 be/src/vec/olap/block_reader.cpp                   |  2 +-
 be/src/vec/sink/vtablet_sink.cpp                   | 23 ++++--
 10 files changed, 120 insertions(+), 143 deletions(-)

diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 900f60a6a8..a2edb73b19 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -273,9 +273,6 @@ protected:
     // add batches finished means the last rpc has be response, used to check 
whether this channel can be closed
     std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized
 
-    // TODO(cmy): should be removed
-    std::atomic<bool> _last_patch_processed_finished {true}; // reuse for 
vectorized
-
     bool _eos_is_produced {false}; // only for restricting producer behaviors
 
     std::unique_ptr<RowDescriptor> _row_desc;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 66d6a04fc2..2628b34007 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -28,8 +28,10 @@
 #include "util/doris_metrics.h"
 #include "vec/core/field.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
 
 namespace doris {
+
 MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* 
tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
                    KeysType keys_type, RowsetWriter* rowset_writer,
@@ -77,13 +79,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* 
block)
                         ->column(cid)
                         .aggregation();
         std::string agg_name =
-                TabletColumn::get_string_by_aggregation_type(agg_method);
-        if (agg_name=="REPLACE"){
-            agg_name = "last_value";
-        }else{
-            agg_name += "_reader";
-        }
-        
+                TabletColumn::get_string_by_aggregation_type(agg_method) + 
vectorized::AGG_LOAD_SUFFIX;
         std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
                         [](unsigned char c) { return std::tolower(c); });
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 02f7699719..c5f519eeeb 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -28,6 +28,7 @@
 #include "vec/core/block.h"
 #include "vec/common/string_ref.h"
 #include "vec/aggregate_functions/aggregate_function.h"
+
 namespace doris {
 
 struct ContiguousRow;
@@ -37,8 +38,6 @@ class SlotDescriptor;
 class TabletSchema;
 class Tuple;
 class TupleDescriptor;
-class RowInBlock;
-class RowInBlockComparator;
 class MemTable {
 public:
    
@@ -103,8 +102,11 @@ private:
             NullState null_state = is_null ? NullState::IS_NULL : 
NullState::NOT_NULL;
             return RowCursorCell(ref.data, null_state);
         }
+
         ~RowInBlock() {
-            std::vector<vectorized::AggregateDataPtr>().swap(_agg_places);
+            for (auto agg_place : _agg_places) {
+                delete [] agg_place;
+            }
         }
     };
     class RowInBlockComparator {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
index ce78397794..f90515fd5e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
@@ -20,29 +20,38 @@
 namespace doris::vectorized {
 
 // auto spread at nullable condition, null value do not participate aggregate
-void register_aggregate_function_reader(AggregateFunctionSimpleFactory& 
factory) {
+void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& 
factory) {
     // add a suffix to the function name here to distinguish special functions 
of agg reader
-    auto register_function_reader = [&](const std::string& name,
-                                        const AggregateFunctionCreator& 
creator) {
-        factory.register_function(name + agg_reader_suffix, creator, false);
+    auto register_function = [&](const std::string& name,
+                                 const AggregateFunctionCreator& creator) {
+        factory.register_function(name + AGG_READER_SUFFIX, creator, false);
+        factory.register_function(name + AGG_LOAD_SUFFIX, creator, false);
     };
 
-    register_function_reader("sum", create_aggregate_function_sum_reader);
-    register_function_reader("max", create_aggregate_function_max);
-    register_function_reader("min", create_aggregate_function_min);
-    register_function_reader("replace_if_not_null", 
create_aggregate_function_replace_if_not_null);
-    register_function_reader("bitmap_union", 
create_aggregate_function_bitmap_union);
-    register_function_reader("hll_union", 
create_aggregate_function_HLL_union<false>);
+    register_function("sum", create_aggregate_function_sum_reader);
+    register_function("max", create_aggregate_function_max);
+    register_function("min", create_aggregate_function_min);
+    register_function("bitmap_union", create_aggregate_function_bitmap_union);
+    register_function("hll_union", create_aggregate_function_HLL_union<false>);
 }
 
-void 
register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& 
factory) {
-    auto register_function_reader = [&](const std::string& name,
-                                        const AggregateFunctionCreator& 
creator, bool nullable) {
-        factory.register_function(name + agg_reader_suffix, creator, nullable);
+// only replace funtion in load/reader do different agg operation.
+// because Doris can ensure that the data is globally ordered in reader, but 
cannot in load
+// 1. reader, get the first value of input data.
+// 2. load, get the last value of input data.
+void 
register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& 
factory) {
+    auto register_function = [&](const std::string& name, const std::string& 
suffix,
+                                 const AggregateFunctionCreator& creator, bool 
nullable) {
+        factory.register_function(name + suffix, creator, nullable);
     };
 
-    register_function_reader("replace", create_aggregate_function_replace, 
false);
-    register_function_reader("replace", 
create_aggregate_function_replace_nullable, true);
+    register_function("replace", AGG_READER_SUFFIX, 
create_aggregate_function_first<false, true>, false);
+    register_function("replace", AGG_READER_SUFFIX, 
create_aggregate_function_first<true, true>, true);
+    register_function("replace", AGG_LOAD_SUFFIX, 
create_aggregate_function_last<false, true>, false);
+    register_function("replace", AGG_LOAD_SUFFIX, 
create_aggregate_function_last<true, true>, true);
+
+    register_function("replace_if_not_null", AGG_READER_SUFFIX, 
create_aggregate_function_first<false, true>, false);
+    register_function("replace_if_not_null", AGG_LOAD_SUFFIX, 
create_aggregate_function_last<false, true>, false);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h 
b/be/src/vec/aggregate_functions/aggregate_function_reader.h
index f44be5ee57..86fea6f079 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h
@@ -26,10 +26,11 @@
 
 namespace doris::vectorized {
 
-static const std::string agg_reader_suffix = "_reader";
+static auto constexpr AGG_READER_SUFFIX = "_reader";
+static auto constexpr AGG_LOAD_SUFFIX = "_load";
 
-void register_aggregate_function_reader(AggregateFunctionSimpleFactory& 
factory);
+void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& 
factory);
 
-void 
register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& 
factory);
+void 
register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& 
factory);
 
 } // namespace doris::vectorized
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
index fcf333c1bd..799f3cbb6a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
@@ -56,7 +56,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_uniq(instance);
         register_aggregate_function_bitmap(instance);
         register_aggregate_function_combinator_distinct(instance);
-        register_aggregate_function_reader(instance); // register aggregate 
function for agg reader
+        register_aggregate_function_reader_load(instance); // register 
aggregate function for agg reader
         register_aggregate_function_window_rank(instance);
         register_aggregate_function_stddev_variance_pop(instance);
         register_aggregate_function_topn(instance);
@@ -70,7 +70,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_combinator_null(instance);
 
         register_aggregate_function_stddev_variance_samp(instance);
-        register_aggregate_function_reader_no_spread(instance);
+        register_aggregate_function_replace_reader_load(instance);
         register_aggregate_function_window_lead_lag(instance);
         register_aggregate_function_HLL_union_agg(instance);
         register_aggregate_function_percentile_approx(instance);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
index d4b7f99121..53a4c4931c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
@@ -23,7 +23,7 @@
 #include "common/logging.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/aggregate_functions/factory_helpers.h"
-#include "vec/aggregate_functions/helpers.h"
+
 namespace doris::vectorized {
 
 AggregateFunctionPtr create_aggregate_function_dense_rank(const std::string& 
name,
@@ -53,44 +53,6 @@ AggregateFunctionPtr 
create_aggregate_function_row_number(const std::string& nam
     return std::make_shared<WindowFunctionRowNumber>(argument_types);
 }
 
-template <template <typename> class AggregateFunctionTemplate, template 
<typename> class Data,
-          bool is_nullable, bool is_copy = false>
-static IAggregateFunction* create_function_single_value(const String& name,
-                                                        const DataTypes& 
argument_types,
-                                                        const Array& 
parameters) {
-    using StoreType = CopiedValue;
-
-    assert_arity_at_most<3>(name, argument_types);
-
-    auto type = argument_types[0].get();
-    if (type->is_nullable()) {
-        type = assert_cast<const 
DataTypeNullable*>(type)->get_nested_type().get();
-    }
-    WhichDataType which(*type);
-
-#define DISPATCH(TYPE)                        \
-    if (which.idx == TypeIndex::TYPE)         \
-        return new AggregateFunctionTemplate< \
-                Data<LeadAndLagData<TYPE, is_nullable, false, 
StoreType>>>(argument_types);
-    FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
-
-    if (which.is_decimal()) {
-        return new AggregateFunctionTemplate<
-                Data<LeadAndLagData<Int128, is_nullable, false, 
StoreType>>>(argument_types);
-    }
-    if (which.is_date_or_datetime()) {
-        return new AggregateFunctionTemplate<
-                Data<LeadAndLagData<Int64, is_nullable, false, 
StoreType>>>(argument_types);
-    }
-    if (which.is_string_or_fixed_string()) {
-        return new AggregateFunctionTemplate<
-                Data<LeadAndLagData<StringRef, is_nullable, true, 
StoreType>>>(argument_types);
-    }
-    DCHECK(false) << "with unknowed type, failed in  
create_aggregate_function_leadlag";
-    return nullptr;
-}
-
 template <bool is_nullable>
 AggregateFunctionPtr create_aggregate_function_lag(const std::string& name,
                                                    const DataTypes& 
argument_types,
@@ -111,53 +73,6 @@ AggregateFunctionPtr create_aggregate_function_lead(const 
std::string& name,
                     name, argument_types, parameters));
 }
 
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_first(const std::string& name,
-                                                     const DataTypes& 
argument_types,
-                                                     const Array& parameters,
-                                                     const bool 
result_is_nullable) {
-    return AggregateFunctionPtr(
-            create_function_single_value<WindowFunctionData, 
WindowFunctionFirstData, is_nullable>(
-                    name, argument_types, parameters));
-}
-
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_last(const std::string& name,
-                                                    const DataTypes& 
argument_types,
-                                                    const Array& parameters,
-                                                    const bool 
result_is_nullable) {
-    return AggregateFunctionPtr(
-            create_function_single_value<WindowFunctionData, 
WindowFunctionLastData, is_nullable>(
-                    name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const 
std::string& name,
-                                                                   const 
DataTypes& argument_types,
-                                                                   const 
Array& parameters,
-                                                                   const bool 
result_is_nullable) {
-    return AggregateFunctionPtr(
-            create_function_single_value<WindowFunctionData, 
WindowFunctionFirstData, false, true>(
-                    name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace(const std::string& name,
-                                                       const DataTypes& 
argument_types,
-                                                       const Array& parameters,
-                                                       const bool 
result_is_nullable) {
-    return AggregateFunctionPtr(
-            create_function_single_value<WindowFunctionData, 
WindowFunctionFirstData, false, true>(
-                    name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace_nullable(const 
std::string& name,
-                                                                const 
DataTypes& argument_types,
-                                                                const Array& 
parameters,
-                                                                const bool 
result_is_nullable) {
-    return AggregateFunctionPtr(
-            create_function_single_value<WindowFunctionData, 
WindowFunctionFirstData, true, true>(
-                    name, argument_types, parameters));
-}
-
 void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory& 
factory) {
     factory.register_function("dense_rank", 
create_aggregate_function_dense_rank);
     factory.register_function("rank", create_aggregate_function_rank);
@@ -169,9 +84,10 @@ void 
register_aggregate_function_window_lead_lag(AggregateFunctionSimpleFactory&
     factory.register_function("lead", create_aggregate_function_lead<true>, 
true);
     factory.register_function("lag", create_aggregate_function_lag<false>);
     factory.register_function("lag", create_aggregate_function_lag<true>, 
true);
-    factory.register_function("first_value", 
create_aggregate_function_first<false>);
-    factory.register_function("first_value", 
create_aggregate_function_first<true>, true);
-    factory.register_function("last_value", 
create_aggregate_function_last<false>);
-    factory.register_function("last_value", 
create_aggregate_function_last<true>, true);
+    factory.register_function("first_value", 
create_aggregate_function_first<false, false>);
+    factory.register_function("first_value", 
create_aggregate_function_first<true, false>, true);
+    factory.register_function("last_value", 
create_aggregate_function_last<false, false>);
+    factory.register_function("last_value", 
create_aggregate_function_last<true, false>, true);
 }
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h 
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 133efe7ea0..c438cd3582 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -21,11 +21,13 @@
 #pragma once
 
 #include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/helpers.h"
 #include "vec/columns/column_vector.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/data_types/data_type_string.h"
 #include "vec/io/io_helper.h"
+#include "factory_helpers.h"
 
 namespace doris::vectorized {
 
@@ -405,19 +407,62 @@ private:
     DataTypePtr _argument_type;
 };
 
-AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const 
std::string& name,
-                                                                   const 
DataTypes& argument_types,
-                                                                   const 
Array& parameters,
-                                                                   const bool 
result_is_nullable);
+template <template <typename> class AggregateFunctionTemplate, template 
<typename> class Data,
+          bool is_nullable, bool is_copy = false>
+static IAggregateFunction* create_function_single_value(const String& name,
+                                                        const DataTypes& 
argument_types,
+                                                        const Array& 
parameters) {
+    using StoreType = std::conditional_t<is_copy, CopiedValue, Value>;
 
-AggregateFunctionPtr create_aggregate_function_replace(const std::string& name,
-                                                       const DataTypes& 
argument_types,
-                                                       const Array& parameters,
-                                                       const bool 
result_is_nullable);
+    assert_arity_at_most<3>(name, argument_types);
 
-AggregateFunctionPtr create_aggregate_function_replace_nullable(const 
std::string& name,
-                                                                const 
DataTypes& argument_types,
-                                                                const Array& 
parameters,
-                                                                const bool 
result_is_nullable);
+    auto type = argument_types[0].get();
+    if (type->is_nullable()) {
+        type = assert_cast<const 
DataTypeNullable*>(type)->get_nested_type().get();
+    }
+    WhichDataType which(*type);
+
+#define DISPATCH(TYPE)                        \
+    if (which.idx == TypeIndex::TYPE)         \
+        return new AggregateFunctionTemplate< \
+                Data<LeadAndLagData<TYPE, is_nullable, false, 
StoreType>>>(argument_types);
+    FOR_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+
+    if (which.is_decimal()) {
+        return new AggregateFunctionTemplate<
+                Data<LeadAndLagData<Int128, is_nullable, false, 
StoreType>>>(argument_types);
+    }
+    if (which.is_date_or_datetime()) {
+        return new AggregateFunctionTemplate<
+                Data<LeadAndLagData<Int64, is_nullable, false, 
StoreType>>>(argument_types);
+    }
+    if (which.is_string_or_fixed_string()) {
+        return new AggregateFunctionTemplate<
+                Data<LeadAndLagData<StringRef, is_nullable, true, 
StoreType>>>(argument_types);
+    }
+    DCHECK(false) << "with unknowed type, failed in  
create_aggregate_function_leadlag";
+    return nullptr;
+}
+
+template <bool is_nullable, bool is_copy>
+AggregateFunctionPtr create_aggregate_function_first(const std::string& name,
+                                                     const DataTypes& 
argument_types,
+                                                     const Array& parameters,
+                                                     bool result_is_nullable) {
+    return AggregateFunctionPtr(
+            create_function_single_value<WindowFunctionData, 
WindowFunctionFirstData, is_nullable, is_copy>(
+                    name, argument_types, parameters));
+}
+
+template <bool is_nullable, bool is_copy>
+AggregateFunctionPtr create_aggregate_function_last(const std::string& name,
+                                                    const DataTypes& 
argument_types,
+                                                    const Array& parameters,
+                                                    bool result_is_nullable) {
+    return AggregateFunctionPtr(
+            create_function_single_value<WindowFunctionData, 
WindowFunctionLastData, is_nullable, is_copy>(
+                    name, argument_types, parameters));
+}
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 1959fc70fa..a24fb4ee77 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -91,7 +91,7 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
                         
.column(read_params.origin_return_columns->at(_return_columns_loc[idx]))
                         .aggregation();
         std::string agg_name =
-                TabletColumn::get_string_by_aggregation_type(agg_method) + 
agg_reader_suffix;
+                TabletColumn::get_string_by_aggregation_type(agg_method) + 
AGG_READER_SUFFIX;
         std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
                        [](unsigned char c) { return std::tolower(c); });
 
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 54a361044e..997b619bd5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -178,18 +178,28 @@ Status VNodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
 
 int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
                                             std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
-    auto st = none_of({_cancelled, _send_finished});
+        auto st = none_of({_cancelled, _send_finished});
     if (!st.ok()) {
         return 0;
     }
-    bool is_finished = true;
-    if (!_add_block_closure->is_packet_in_flight() && _pending_batches_num > 0 
&&
-        _last_patch_processed_finished.compare_exchange_strong(is_finished, 
false)) {
+
+    if (!_add_block_closure->try_set_in_flight()) {
+        return _send_finished ? 0 : 1;
+    }
+
+    // We are sure that try_send_batch is not running
+    if (_pending_batches_num > 0) {
         auto s = thread_pool_token->submit_func(
-            std::bind(&VNodeChannel::try_send_block, this, state));
+                std::bind(&VNodeChannel::try_send_block, this, state));
         if (!s.ok()) {
             _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
failed");
+            // clear in flight
+            _add_block_closure->clear_in_flight();
         }
+        // in_flight is cleared in closure::Run
+    } else {
+        // clear in flight
+        _add_block_closure->clear_in_flight();
     }
     return _send_finished ? 0 : 1;
 }
@@ -221,6 +231,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
                                     &compressed_bytes, &_column_values_buffer);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
+            _add_block_closure->clear_in_flight();
             return;
         }
         if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -234,6 +245,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         if (remain_ms <= 0 && !request.eos()) {
             cancel(fmt::format("{}, err: timeout", channel_info()));
+            _add_block_closure->clear_in_flight();
             return;
         } else {
             remain_ms = config::min_load_rpc_timeout_ms;
@@ -266,7 +278,6 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
                                    _add_block_closure);
 
     _next_packet_seq++;
-    _last_patch_processed_finished = true;
 }
 
 void VNodeChannel::_close_check() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to