This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db17f5fe791 [improve](move-memtbale) enable move memtable in routine 
load (#28974)
db17f5fe791 is described below

commit db17f5fe791af648060cdcad673a30ce7283b39c
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sat Jan 6 18:22:01 2024 +0800

    [improve](move-memtbale) enable move memtable in routine load (#28974)
---
 be/src/io/fs/multi_table_pipe.cpp                      |  1 +
 .../routine_load/routine_load_task_executor.cpp        |  3 +++
 be/src/runtime/stream_load/stream_load_context.h       |  2 ++
 .../apache/doris/load/routineload/KafkaTaskInfo.java   |  1 +
 .../apache/doris/load/routineload/RoutineLoadJob.java  | 18 +++++++++++++++++-
 gensrc/thrift/BackendService.thrift                    |  1 +
 6 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index a11d6412df2..da46645fd4f 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -162,6 +162,7 @@ Status MultiTablePipe::request_and_exec_plans() {
     request.__set_loadId(_ctx->id.to_thrift());
     request.fileType = TFileType::FILE_STREAM;
     request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
+    request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
     // no need to register new_load_stream_mgr coz it is already done in 
routineload submit task
 
     // plan this load
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d14b2999c14..dc6b855cd5a 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -213,6 +213,9 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     if (task.__isset.is_multi_table && task.is_multi_table) {
         ctx->is_multi_table = true;
     }
+    if (task.__isset.memtable_on_sink_node) {
+        ctx->memtable_on_sink_node = task.memtable_on_sink_node;
+    }
 
     // set execute plan params (only for non-single-stream-multi-table load)
     TStreamLoadPutResult put_result;
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index e57996af9e1..3f1f6b92431 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -232,6 +232,8 @@ public:
     // for single-stream-multi-table, we have table list
     std::vector<std::string> table_list;
 
+    bool memtable_on_sink_node = false;
+
 public:
     ExecEnv* exec_env() { return _exec_env; }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 56e931993e8..2075e5548e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -108,6 +108,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         } else {
             tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
         }
+        
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
         return tRoutineLoadTask;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8bea553c792..1ce3c1e8c0c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -202,6 +202,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     protected String sequenceCol;
 
+    protected boolean memtableOnSinkNode = false;
+
     /**
      * RoutineLoad support json data.
      * Require Params:
@@ -268,6 +270,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     public RoutineLoadJob(long id, LoadDataSourceType type) {
         this.id = id;
         this.dataSourceType = type;
+        if (ConnectContext.get() != null) {
+            this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+        }
     }
 
     public RoutineLoadJob(Long id, String name,
@@ -283,6 +288,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
+            this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -304,6 +310,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
+            this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -686,6 +693,15 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         return !Strings.isNullOrEmpty(sequenceCol);
     }
 
+    @Override
+    public boolean isMemtableOnSinkNode() {
+        return memtableOnSinkNode;
+    }
+
+    public void setMemtableOnSinkNode(boolean memtableOnSinkNode) {
+        this.memtableOnSinkNode = memtableOnSinkNode;
+    }
+
     public void setComment(String comment) {
         this.comment = comment;
     }
@@ -874,11 +890,11 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     }
 
     private void initPlanner() throws UserException {
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         // for multi table load job, the table name is dynamic,we will set 
table when task scheduling.
         if (isMultiTable) {
             return;
         }
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         planner = new StreamLoadPlanner(db,
                 (OlapTable) db.getTableOrMetaException(this.tableId, 
Table.TableType.OLAP), this);
     }
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 2c5b199fc17..dab0b860677 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -67,6 +67,7 @@ struct TRoutineLoadTask {
     14: optional PlanNodes.TFileFormatType format
     15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
     16: optional bool is_multi_table
+    17: optional bool memtable_on_sink_node;
 }
 
 struct TKafkaMetaProxyRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to