This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbbc15e3347 [fix](cloud) fix be core when using stream load record in 
cloud mode (#37903)
cbbc15e3347 is described below

commit cbbc15e33475c62d7b690999f2e973173cb01606
Author: hui lai <1353307...@qq.com>
AuthorDate: Sat Jul 20 20:59:41 2024 +0800

    [fix](cloud) fix be core when using stream load record in cloud mode 
(#37903)
---
 be/src/cloud/cloud_backend_service.cpp             |  8 +++
 be/src/cloud/cloud_backend_service.h               |  3 +
 be/src/cloud/cloud_storage_engine.cpp              |  4 ++
 be/src/http/action/http_stream.cpp                 |  8 ++-
 be/src/http/action/stream_load.cpp                 |  8 ++-
 be/src/olap/storage_engine.cpp                     | 41 ++++++------
 be/src/olap/storage_engine.h                       | 16 ++---
 be/src/service/backend_service.cpp                 | 48 ++++++++------
 be/src/service/backend_service.h                   |  4 ++
 .../org/apache/doris/load/StreamLoadRecordMgr.java | 28 ++++----
 .../pipeline/cloud_p0/conf/be_custom.conf          |  2 +
 regression-test/pipeline/p0/conf/be.conf           |  1 +
 .../stream_load/test_stream_load_record.groovy     | 76 ++++++++++++++++++++++
 13 files changed, 178 insertions(+), 69 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index f576b60045d..d91e9e416b8 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -29,6 +29,8 @@
 #include "common/status.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_recorder.h"
 #include "util/brpc_client_cache.h" // BrpcClientCache
 #include "util/thrift_server.h"
 
@@ -186,4 +188,10 @@ void 
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
     response.status = t_status;
 }
 
+void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& 
result,
+                                                 int64_t 
last_stream_record_time) {
+    BaseBackendService::get_stream_load_record(result, last_stream_record_time,
+                                               
_engine.get_stream_load_recorder());
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_backend_service.h 
b/be/src/cloud/cloud_backend_service.h
index 88f0099fe73..358cb4d1f0b 100644
--- a/be/src/cloud/cloud_backend_service.h
+++ b/be/src/cloud/cloud_backend_service.h
@@ -53,6 +53,9 @@ public:
     void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
                                    const TCheckWarmUpCacheAsyncRequest& 
request) override;
 
+    void get_stream_load_record(TStreamLoadRecordResult& result,
+                                int64_t last_stream_record_time) override;
+
 private:
     CloudStorageEngine& _engine;
 };
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index b1b455d2007..de4bbac7b3e 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {
 
     _tablet_hotspot = std::make_unique<TabletHotspot>();
 
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
+            "init StreamLoadRecorder failed");
+
     return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
             .set_max_threads(config::sync_load_for_tablets_thread)
             .set_min_threads(config::sync_load_for_tablets_thread)
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index a3439969e60..87cc2f694eb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -30,6 +30,7 @@
 #include <rapidjson/prettywriter.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include "cloud/cloud_storage_engine.h"
 #include "cloud/config.h"
 #include "common/config.h"
 #include "common/consts.h"
@@ -119,7 +120,7 @@ void HttpStreamAction::handle(HttpRequest* req) {
     // add new line at end
     str = str + '\n';
     HttpChannel::send_reply(req, str);
-    if (config::enable_stream_load_record && !config::is_cloud_mode()) {
+    if (config::enable_stream_load_record) {
         str = ctx->prepare_stream_load_record(str);
         _save_stream_load_record(ctx, str);
     }
@@ -364,8 +365,9 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
 
 void 
HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> 
ctx,
                                                 const std::string& str) {
-    auto stream_load_recorder =
-            
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
+    std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
+            
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
+
     if (stream_load_recorder != nullptr) {
         std::string key =
                 std::to_string(ctx->start_millis + ctx->load_cost_millis) + 
"_" + ctx->label;
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 93fde511898..2b6a0803e81 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -39,6 +39,7 @@
 #include <stdexcept>
 #include <utility>
 
+#include "cloud/cloud_storage_engine.h"
 #include "cloud/config.h"
 #include "common/config.h"
 #include "common/consts.h"
@@ -217,7 +218,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         str = str + '\n';
         HttpChannel::send_reply(req, str);
 #ifndef BE_TEST
-        if (config::enable_stream_load_record && !config::is_cloud_mode()) {
+        if (config::enable_stream_load_record) {
             str = ctx->prepare_stream_load_record(str);
             _save_stream_load_record(ctx, str);
         }
@@ -705,8 +706,9 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, 
std::string* file_pa
 
 void 
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> 
ctx,
                                                 const std::string& str) {
-    auto stream_load_recorder =
-            
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
+    std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
+            
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
+
     if (stream_load_recorder != nullptr) {
         std::string key =
                 std::to_string(ctx->start_millis + ctx->load_cost_millis) + 
"_" + ctx->label;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 43093d3183e..f9fe26bb934 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -134,6 +134,25 @@ int64_t 
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
                     
config::memory_limitation_per_thread_for_schema_change_bytes);
 }
 
+Status BaseStorageEngine::init_stream_load_recorder(const std::string& 
stream_load_record_path) {
+    LOG(INFO) << "stream load record path: " << stream_load_record_path;
+    // init stream load record rocksdb
+    _stream_load_recorder = 
StreamLoadRecorder::create_shared(stream_load_record_path);
+    if (_stream_load_recorder == nullptr) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::MemoryAllocFailed("allocate memory for 
StreamLoadRecorder failed"),
+                "new StreamLoadRecorder failed");
+    }
+    auto st = _stream_load_recorder->init();
+    if (!st.ok()) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::IOError("open StreamLoadRecorder rocksdb failed, 
path={}",
+                                stream_load_record_path),
+                "init StreamLoadRecorder failed");
+    }
+    return Status::OK();
+}
+
 static Status _validate_options(const EngineOptions& options) {
     if (options.store_paths.empty()) {
         return Status::InternalError("store paths is empty");
@@ -158,7 +177,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _tablet_manager(new TabletManager(*this, 
config::tablet_map_shard_size)),
           _txn_manager(new TxnManager(*this, config::txn_map_shard_size, 
config::txn_shard_size)),
           _default_rowset_type(BETA_ROWSET),
-          _stream_load_recorder(nullptr),
           _create_tablet_idx_lru_cache(
                   new 
CreateTabletIdxCache(config::partition_disk_index_lru_size)),
           _snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
@@ -274,31 +292,12 @@ Status StorageEngine::_init_store_map() {
         return Status::InternalError("init path failed, error={}", error_msg);
     }
 
-    
RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(_options.store_paths[0].path),
+    
RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path),
                                    "init StreamLoadRecorder failed");
 
     return Status::OK();
 }
 
-Status StorageEngine::_init_stream_load_recorder(const std::string& 
stream_load_record_path) {
-    LOG(INFO) << "stream load record path: " << stream_load_record_path;
-    // init stream load record rocksdb
-    _stream_load_recorder = 
StreamLoadRecorder::create_shared(stream_load_record_path);
-    if (_stream_load_recorder == nullptr) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                Status::MemoryAllocFailed("allocate memory for 
StreamLoadRecorder failed"),
-                "new StreamLoadRecorder failed");
-    }
-    auto st = _stream_load_recorder->init();
-    if (!st.ok()) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                Status::IOError("open StreamLoadRecorder rocksdb failed, 
path={}",
-                                stream_load_record_path),
-                "init StreamLoadRecorder failed");
-    }
-    return Status::OK();
-}
-
 void StorageEngine::_update_storage_medium_type_count() {
     set<TStorageMedium::type> available_storage_medium_types;
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6e8fb7bbb7f..b1f30e5db8c 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -133,6 +133,12 @@ public:
 
     int get_disk_num() { return _disk_num; }
 
+    Status init_stream_load_recorder(const std::string& 
stream_load_record_path);
+
+    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
+        return _stream_load_recorder;
+    }
+
 protected:
     void _evict_querying_rowset();
     void _evict_quring_rowset_thread_callback();
@@ -157,6 +163,8 @@ protected:
     int64_t _memory_limitation_bytes_for_schema_change;
 
     int _disk_num {-1};
+
+    std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
 };
 
 class StorageEngine final : public BaseStorageEngine {
@@ -246,10 +254,6 @@ public:
 
     bool should_fetch_from_peer(int64_t tablet_id);
 
-    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
-        return _stream_load_recorder;
-    }
-
     Status get_compaction_status_json(std::string* result);
 
     // check cumulative compaction config
@@ -349,8 +353,6 @@ private:
     void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
                                                CompactionType compaction_type);
 
-    Status _init_stream_load_recorder(const std::string& 
stream_load_record_path);
-
     Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType 
compaction_type,
                                    bool force);
 
@@ -470,8 +472,6 @@ private:
     std::mutex _compaction_producer_sleep_mutex;
     std::condition_variable _compaction_producer_sleep_cv;
 
-    std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
-
     // we use unordered_map to store all cumulative compaction policy sharded 
ptr
     std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>
             _cumulative_compaction_policies;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 4effc225110..d686c12609a 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -923,27 +923,8 @@ void BaseBackendService::close_scanner(TScanCloseResult& 
result_, const TScanClo
 
 void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
                                             int64_t last_stream_record_time) {
-    auto stream_load_recorder = _engine.get_stream_load_recorder();
-    if (stream_load_recorder != nullptr) {
-        std::map<std::string, std::string> records;
-        auto st = 
stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
-                                                  
config::stream_load_record_batch_size, &records);
-        if (st.ok()) {
-            LOG(INFO) << "get_batch stream_load_record rocksdb successfully. 
records size: "
-                      << records.size()
-                      << ", last_stream_load_timestamp: " << 
last_stream_record_time;
-            std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
-            auto it = records.begin();
-            for (; it != records.end(); ++it) {
-                TStreamLoadRecord stream_load_item;
-                StreamLoadContext::parse_stream_load_record(it->second, 
stream_load_item);
-                stream_load_record_batch.emplace(it->first.c_str(), 
stream_load_item);
-            }
-            result.__set_stream_load_record(stream_load_record_batch);
-        }
-    } else {
-        LOG(WARNING) << "stream_load_recorder is null.";
-    }
+    BaseBackendService::get_stream_load_record(result, last_stream_record_time,
+                                               
_engine.get_stream_load_recorder());
 }
 
 void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
@@ -1199,6 +1180,31 @@ void 
BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
     LOG(ERROR) << "get_stream_load_record is not implemented";
 }
 
+void BaseBackendService::get_stream_load_record(
+        TStreamLoadRecordResult& result, int64_t last_stream_record_time,
+        std::shared_ptr<StreamLoadRecorder> stream_load_recorder) {
+    if (stream_load_recorder != nullptr) {
+        std::map<std::string, std::string> records;
+        auto st = 
stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
+                                                  
config::stream_load_record_batch_size, &records);
+        if (st.ok()) {
+            LOG(INFO) << "get_batch stream_load_record rocksdb successfully. 
records size: "
+                      << records.size()
+                      << ", last_stream_load_timestamp: " << 
last_stream_record_time;
+            std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
+            auto it = records.begin();
+            for (; it != records.end(); ++it) {
+                TStreamLoadRecord stream_load_item;
+                StreamLoadContext::parse_stream_load_record(it->second, 
stream_load_item);
+                stream_load_record_batch.emplace(it->first.c_str(), 
stream_load_item);
+            }
+            result.__set_stream_load_record(stream_load_record_batch);
+        }
+    } else {
+        LOG(WARNING) << "stream_load_recorder is null.";
+    }
+}
+
 void 
BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& 
diskTrashInfos) {
     LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
 }
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 0ada1bf5393..f0e06094560 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -26,6 +26,7 @@
 #include "agent/agent_server.h"
 #include "agent/topic_subscriber.h"
 #include "common/status.h"
+#include "runtime/stream_load/stream_load_recorder.h"
 
 namespace doris {
 
@@ -165,6 +166,9 @@ public:
 protected:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& 
exec_params);
 
+    void get_stream_load_record(TStreamLoadRecordResult& result, int64_t 
last_stream_record_time,
+                                std::shared_ptr<StreamLoadRecorder> 
stream_load_recorder);
+
     ExecEnv* _exec_env = nullptr;
     std::unique_ptr<AgentServer> _agent_server;
     std::unique_ptr<ThreadPool> _ingest_binlog_workers;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
index f44e8b785f6..6c53f354af8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
@@ -150,16 +150,19 @@ public class StreamLoadRecordMgr extends MasterDaemon {
     }
 
     public List<StreamLoadItem> getStreamLoadRecords() {
+        LOG.info("test log: {}", streamLoadRecordHeap);
         return new ArrayList<>(streamLoadRecordHeap);
     }
 
     public List<List<Comparable>> getStreamLoadRecordByDb(
             long dbId, String label, boolean accurateMatch, StreamLoadState 
state) {
         LinkedList<List<Comparable>> streamLoadRecords = new 
LinkedList<List<Comparable>>();
+        LOG.info("test log: {}", dbId);
 
         readLock();
         try {
             if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
+                LOG.info("test log: {}", dbId);
                 return streamLoadRecords;
             }
 
@@ -202,6 +205,7 @@ public class StreamLoadRecordMgr extends MasterDaemon {
                 }
 
             }
+            LOG.info("test log: {}", streamLoadRecords);
             return streamLoadRecords;
         } finally {
             readUnlock();
@@ -263,19 +267,17 @@ public class StreamLoadRecordMgr extends MasterDaemon {
                             TimeUtils.getDatetimeMsFormatWithTimeZone());
                     String finishTime = 
TimeUtils.longToTimeString(streamLoadItem.getFinishTime(),
                             TimeUtils.getDatetimeMsFormatWithTimeZone());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("receive stream load record info from 
backend: {}."
-                                        + " label: {}, db: {}, tbl: {}, user: 
{}, user_ip: {},"
-                                        + " status: {}, message: {}, 
error_url: {},"
-                                        + " total_rows: {}, loaded_rows: {}, 
filtered_rows: {}, unselected_rows: {},"
-                                        + " load_bytes: {}, start_time: {}, 
finish_time: {}.",
-                                backend.getHost(), streamLoadItem.getLabel(), 
streamLoadItem.getDb(),
-                                streamLoadItem.getTbl(), 
streamLoadItem.getUser(), streamLoadItem.getUserIp(),
-                                streamLoadItem.getStatus(), 
streamLoadItem.getMessage(), streamLoadItem.getUrl(),
-                                streamLoadItem.getTotalRows(), 
streamLoadItem.getLoadedRows(),
-                                streamLoadItem.getFilteredRows(), 
streamLoadItem.getUnselectedRows(),
-                                streamLoadItem.getLoadBytes(), startTime, 
finishTime);
-                    }
+                    LOG.info("receive stream load record info from backend: 
{}."
+                                    + " label: {}, db: {}, tbl: {}, user: {}, 
user_ip: {},"
+                                    + " status: {}, message: {}, error_url: 
{},"
+                                    + " total_rows: {}, loaded_rows: {}, 
filtered_rows: {}, unselected_rows: {},"
+                                    + " load_bytes: {}, start_time: {}, 
finish_time: {}.",
+                            backend.getHost(), streamLoadItem.getLabel(), 
streamLoadItem.getDb(),
+                            streamLoadItem.getTbl(), streamLoadItem.getUser(), 
streamLoadItem.getUserIp(),
+                            streamLoadItem.getStatus(), 
streamLoadItem.getMessage(), streamLoadItem.getUrl(),
+                            streamLoadItem.getTotalRows(), 
streamLoadItem.getLoadedRows(),
+                            streamLoadItem.getFilteredRows(), 
streamLoadItem.getUnselectedRows(),
+                            streamLoadItem.getLoadBytes(), startTime, 
finishTime);
 
                     AuditEvent auditEvent =
                             new 
StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index a2478b47269..9f2967b1972 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -30,4 +30,6 @@ file_cache_path = 
[{"path":"/data/doris_cloud/file_cache","total_size":104857600
 tmp_file_dirs = 
[{"path":"/data/doris_cloud/tmp","max_cache_bytes":104857600,"max_upload_bytes":104857600}]
 thrift_rpc_timeout_ms = 360000
 save_load_error_log_to_s3 = true
+enable_stream_load_record = true
+stream_load_record_batch_size = 500
 webserver_num_workers = 128
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index 32c7b81f934..a072ac7ad50 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -47,6 +47,7 @@ max_garbage_sweep_interval=180
 
 log_buffer_level = -1
 enable_stream_load_record = true
+stream_load_record_batch_size = 500
 
storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1;/mnt/ssd01/cluster_storage/doris.SSD
 disable_auto_compaction=true
 priority_networks=172.19.0.0/24
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy
new file mode 100644
index 00000000000..96a4fff9c53
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_record", "p0") {
+    def tableName = "test_stream_load_record"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+            `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+            `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+            `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+            `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+            `v8` datetime REPLACE_IF_NOT_NULL NULL,
+            `v9` date REPLACE_IF_NOT_NULL NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+        PARTITION partition_b VALUES [("100000"), ("1000000000")),
+        PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+        PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // test strict_mode success
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+
+        file 'test_strict_mode.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    def count = 0
+    while (true) {
+        sleep(1000)
+        def res = sql"show stream load"
+        log.info("Stream load result: ${res}", res)
+        if (res.size() > 0) {
+            break
+        }
+        if (count > 150) {
+            assertTrue(-1 > 0)
+        }
+        count++
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to