This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new adfbe69415e branch-3.0: [Fix](auto-increment) Fix duplicate 
auto-increment column value problem #43774 (#43983)
adfbe69415e is described below

commit adfbe69415ee8ae5bc6c53fcf82aee86479bd081
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 15 10:39:14 2024 +0800

    branch-3.0: [Fix](auto-increment) Fix duplicate auto-increment column value 
problem #43774 (#43983)
    
    Cherry-picked from #43774
    
    Co-authored-by: bobhan1 <bao...@selectdb.com>
---
 be/src/vec/sink/autoinc_buffer.cpp                 | 33 +++++++++---
 .../test_auto_inc_fetch_fail.out                   | 10 ++++
 .../test_auto_inc_fetch_fail.groovy                | 63 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/sink/autoinc_buffer.cpp 
b/be/src/vec/sink/autoinc_buffer.cpp
index 80ce9d494d5..8754c01f806 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "util/thrift_rpc_helper.h"
 
@@ -44,10 +45,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t 
batch_size) {
 }
 
 Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
+    LOG_INFO(
+            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch 
auto-increment values from fe, "
+            "db_id={}, table_id={}, column_id={}, length={}",
+            _db_id, _table_id, _column_id, length);
     constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
     _rpc_status = Status::OK();
     TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
     for (uint32_t retry_times = 0; retry_times < 
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
+        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
+            _rpc_status = Status::InternalError<false>("injected error");
+            break;
+        });
         TAutoIncrementRangeRequest request;
         TAutoIncrementRangeResult result;
         request.__set_db_id(_db_id);
@@ -67,8 +76,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t 
length) {
 
         if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
             LOG_WARNING(
-                    "Failed to fetch auto-incremnt range, requested to 
non-master FE@{}:{}, change "
-                    "to request to FE@{}:{}. retry_time={}, db_id={}, 
table_id={}, column_id={}",
+                    "Failed to fetch auto-increment range, requested to 
non-master FE@{}:{}, "
+                    "change to request to FE@{}:{}. retry_time={}, db_id={}, 
table_id={}, "
+                    "column_id={}",
                     master_addr.hostname, master_addr.port, 
result.master_address.hostname,
                     result.master_address.port, retry_times, _db_id, 
_table_id, _column_id);
             master_addr = result.master_address;
@@ -78,7 +88,7 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t 
length) {
 
         if (!_rpc_status.ok()) {
             LOG_WARNING(
-                    "Failed to fetch auto-incremnt range, encounter rpc 
failure. "
+                    "Failed to fetch auto-increment range, encounter rpc 
failure. "
                     "errmsg={}, retry_time={}, db_id={}, table_id={}, 
column_id={}",
                     _rpc_status.to_string(), retry_times, _db_id, _table_id, 
_column_id);
             std::this_thread::sleep_for(std::chrono::milliseconds(10));
@@ -86,7 +96,7 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t 
length) {
         }
         if (result.length != length) [[unlikely]] {
             auto msg = fmt::format(
-                    "Failed to fetch auto-incremnt range, request length={}, 
but get "
+                    "Failed to fetch auto-increment range, request length={}, 
but get "
                     "result.length={}, retry_time={}, db_id={}, table_id={}, 
column_id={}",
                     length, result.length, retry_times, _db_id, _table_id, 
_column_id);
             LOG(WARNING) << msg;
@@ -96,14 +106,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t 
length) {
         }
 
         LOG_INFO(
-                "get auto-incremnt range from FE@{}:{}, start={}, length={}, 
elapsed={}ms, "
+                "get auto-increment range from FE@{}:{}, start={}, length={}, 
elapsed={}ms, "
                 "retry_time={}, db_id={}, table_id={}, column_id={}",
                 master_addr.hostname, master_addr.port, result.start, 
result.length,
                 get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, 
_table_id, _column_id);
         return result.start;
     }
     CHECK(!_rpc_status.ok());
-    return _rpc_status;
+    return ResultError(_rpc_status);
 }
 
 void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
@@ -153,10 +163,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t 
length) {
     RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
         auto&& res = _fetch_ids_from_fe(length);
         if (!res.has_value()) [[unlikely]] {
+            auto&& err = res.error();
+            LOG_WARNING(
+                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to 
fetch auto-increment "
+                    "values from fe, db_id={}, table_id={}, column_id={}, 
status={}",
+                    _db_id, _table_id, _column_id, err);
             _is_fetching = false;
             return;
         }
         int64_t start = res.value();
+        LOG_INFO(
+                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully 
fetch auto-increment "
+                "values from fe, db_id={}, table_id={}, column_id={}, 
start={}, length={}",
+                _db_id, _table_id, _column_id, start, length);
         {
             std::lock_guard<std::mutex> lock {_latch};
             _buffers.emplace_back(start, length);
@@ -167,4 +186,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t 
length) {
     return Status::OK();
 }
 
-} // namespace doris::vectorized
+} // namespace doris::vectorized
\ No newline at end of file
diff --git 
a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out 
b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out
new file mode 100644
index 00000000000..453e378f9c4
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0
+
+-- !sql --
+4
+
+-- !sql --
+0
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy 
b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy
new file mode 100644
index 00000000000..e9bb6ae9a3c
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+suite("test_auto_inc_fetch_fail", "nonConcurrent") {
+
+    try {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        def table1 = "test_auto_inc_fetch_fail"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k`  int,
+                `c1` int,
+                `c2` int,
+                `c3` int,
+                `id` BIGINT NOT NULL AUTO_INCREMENT(10000),
+                ) UNIQUE KEY(k)
+            DISTRIBUTED BY HASH(k) BUCKETS 1
+            PROPERTIES ("replication_num" = "1"); """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed")
+
+        try {
+            sql """insert into ${table1}(k,c1,c2,c3) 
values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
+        } catch (Exception e) {
+            logger.info("error : ${e}")
+        }
+        qt_sql "select count(*) from ${table1};"
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        Thread.sleep(1000)
+        
+        sql """insert into ${table1}(k,c1,c2,c3) 
values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
+        qt_sql "select count(*) from ${table1};"
+        qt_sql "select count(*) from ${table1} where id < 10000;"
+
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to