This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea39d50532a [feature](cloud) enable pipeline in cloud_p0 (#31825)
ea39d50532a is described below

commit ea39d50532aa98a3245c147187cdf269e00c1809
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Thu Mar 7 17:32:11 2024 +0800

    [feature](cloud) enable pipeline in cloud_p0 (#31825)
---
 be/src/exec/data_sink.cpp                                  | 14 ++++----------
 be/src/pipeline/pipeline_fragment_context.cpp              |  4 +++-
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp |  4 +++-
 .../pipeline/cloud_p0/conf/regression-conf-custom.groovy   |  4 ++--
 .../pipeline/cloud_p0/conf/session_variables.sql           |  3 +--
 5 files changed, 13 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 62acd908783..79c2b361bb5 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -150,11 +150,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
-            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            if (config::is_cloud_mode()) {
-                return Status::InternalError(
-                        "memtable on sink node is not supported in cloud 
mode");
-            }
+            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+            !config::is_cloud_mode()) {
             sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs));
@@ -295,11 +292,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
-            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            if (config::is_cloud_mode()) {
-                return Status::InternalError(
-                        "memtable on sink node is not supported in cloud 
mode");
-            }
+            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+            !config::is_cloud_mode()) {
             sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b4c5646402e..4c61978f70a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -35,6 +35,7 @@
 #include <typeinfo>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
@@ -820,7 +821,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     case TDataSinkType::OLAP_TABLE_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
-            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+            !config::is_cloud_mode()) {
             sink_ = 
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
                                                                      
_sink.get());
         } else {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index daae5feecfe..65c533ff7e3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -32,6 +32,7 @@
 #include <ostream>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "exec/data_sink.h"
@@ -358,7 +359,8 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
         if (state->query_options().enable_memtable_on_sink_node &&
-            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+            !config::is_cloud_mode()) {
             _sink.reset(new OlapTableSinkV2OperatorX(pool, 
next_sink_operator_id(), row_desc,
                                                      output_exprs));
         } else {
diff --git 
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy 
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 97a2f1515aa..469b0f490ff 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -17,5 +17,5 @@
 
 testGroups = "p0"
 //exclude groups and exclude suites is more prior than include groups and 
include suites.
-excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
-excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"
+excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compact
 [...]
+excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv"
diff --git a/regression-test/pipeline/cloud_p0/conf/session_variables.sql 
b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
index 962464502ec..4cad626a31d 100644
--- a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
+++ b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
@@ -2,5 +2,4 @@
 set global insert_visible_timeout_ms=60000;
 set global enable_auto_analyze=false;
 set global enable_audit_plugin=true;
-set global enable_memtable_on_sink_node=false;
-set global enable_pipeline_x_engine=false;
+set global enable_memtable_on_sink_node=false;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to