This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new db17f5fe791 [improve](move-memtbale) enable move memtable in routine load (#28974) db17f5fe791 is described below commit db17f5fe791af648060cdcad673a30ce7283b39c Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sat Jan 6 18:22:01 2024 +0800 [improve](move-memtbale) enable move memtable in routine load (#28974) --- be/src/io/fs/multi_table_pipe.cpp | 1 + .../routine_load/routine_load_task_executor.cpp | 3 +++ be/src/runtime/stream_load/stream_load_context.h | 2 ++ .../apache/doris/load/routineload/KafkaTaskInfo.java | 1 + .../apache/doris/load/routineload/RoutineLoadJob.java | 18 +++++++++++++++++- gensrc/thrift/BackendService.thrift | 1 + 6 files changed, 25 insertions(+), 1 deletion(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index a11d6412df2..da46645fd4f 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -162,6 +162,7 @@ Status MultiTablePipe::request_and_exec_plans() { request.__set_loadId(_ctx->id.to_thrift()); request.fileType = TFileType::FILE_STREAM; request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); // no need to register new_load_stream_mgr coz it is already done in routineload submit task // plan this load diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d14b2999c14..dc6b855cd5a 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -213,6 +213,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { if (task.__isset.is_multi_table && task.is_multi_table) { ctx->is_multi_table = true; } + if (task.__isset.memtable_on_sink_node) { + ctx->memtable_on_sink_node = task.memtable_on_sink_node; + } // set execute plan params (only for non-single-stream-multi-table load) TStreamLoadPutResult put_result; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index e57996af9e1..3f1f6b92431 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -232,6 +232,8 @@ public: // for single-stream-multi-table, we have table list std::vector<std::string> table_list; + bool memtable_on_sink_node = false; + public: ExecEnv* exec_env() { return _exec_env; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 56e931993e8..2075e5548e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -108,6 +108,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { } else { tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN); } + tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode()); return tRoutineLoadTask; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8bea553c792..1ce3c1e8c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -202,6 +202,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected String sequenceCol; + protected boolean memtableOnSinkNode = false; + /** * RoutineLoad support json data. * Require Params: @@ -268,6 +270,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public RoutineLoadJob(long id, LoadDataSourceType type) { this.id = id; this.dataSourceType = type; + if (ConnectContext.get() != null) { + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; + } } public RoutineLoadJob(Long id, String name, @@ -283,6 +288,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (ConnectContext.get() != null) { SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } @@ -304,6 +310,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (ConnectContext.get() != null) { SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); + this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } @@ -686,6 +693,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return !Strings.isNullOrEmpty(sequenceCol); } + @Override + public boolean isMemtableOnSinkNode() { + return memtableOnSinkNode; + } + + public void setMemtableOnSinkNode(boolean memtableOnSinkNode) { + this.memtableOnSinkNode = memtableOnSinkNode; + } + public void setComment(String comment) { this.comment = comment; } @@ -874,11 +890,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } private void initPlanner() throws UserException { - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); // for multi table load job, the table name is dynamic,we will set table when task scheduling. if (isMultiTable) { return; } + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); planner = new StreamLoadPlanner(db, (OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this); } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 2c5b199fc17..dab0b860677 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -67,6 +67,7 @@ struct TRoutineLoadTask { 14: optional PlanNodes.TFileFormatType format 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params 16: optional bool is_multi_table + 17: optional bool memtable_on_sink_node; } struct TKafkaMetaProxyRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org