This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ea39d50532a [feature](cloud) enable pipeline in cloud_p0 (#31825) ea39d50532a is described below commit ea39d50532aa98a3245c147187cdf269e00c1809 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Thu Mar 7 17:32:11 2024 +0800 [feature](cloud) enable pipeline in cloud_p0 (#31825) --- be/src/exec/data_sink.cpp | 14 ++++---------- be/src/pipeline/pipeline_fragment_context.cpp | 4 +++- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 +++- .../pipeline/cloud_p0/conf/regression-conf-custom.groovy | 4 ++-- .../pipeline/cloud_p0/conf/session_variables.sql | 3 +-- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 62acd908783..79c2b361bb5 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -150,11 +150,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink case TDataSinkType::OLAP_TABLE_SINK: { DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && - !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - if (config::is_cloud_mode()) { - return Status::InternalError( - "memtable on sink node is not supported in cloud mode"); - } + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) && + !config::is_cloud_mode()) { sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs)); @@ -295,11 +292,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink case TDataSinkType::OLAP_TABLE_SINK: { DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && - !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - if (config::is_cloud_mode()) { - return Status::InternalError( - "memtable on sink node is not supported in cloud mode"); - } + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) && + !config::is_cloud_mode()) { sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs)); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b4c5646402e..4c61978f70a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -35,6 +35,7 @@ #include <typeinfo> #include <utility> +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -820,7 +821,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr case TDataSinkType::OLAP_TABLE_SINK: { DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && - !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) && + !config::is_cloud_mode()) { sink_ = std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(), _sink.get()); } else { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index daae5feecfe..65c533ff7e3 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -32,6 +32,7 @@ #include <ostream> #include <utility> +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "exec/data_sink.h" @@ -358,7 +359,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } case TDataSinkType::OLAP_TABLE_SINK: { if (state->query_options().enable_memtable_on_sink_node && - !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) && + !config::is_cloud_mode()) { _sink.reset(new OlapTableSinkV2OperatorX(pool, next_sink_operator_id(), row_desc, output_exprs)); } else { diff --git a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy index 97a2f1515aa..469b0f490ff 100644 --- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy @@ -17,5 +17,5 @@ testGroups = "p0" //exclude groups and exclude suites is more prior than include groups and include suites. -excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel" -excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv" +excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compact [...] +excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv" diff --git a/regression-test/pipeline/cloud_p0/conf/session_variables.sql b/regression-test/pipeline/cloud_p0/conf/session_variables.sql index 962464502ec..4cad626a31d 100644 --- a/regression-test/pipeline/cloud_p0/conf/session_variables.sql +++ b/regression-test/pipeline/cloud_p0/conf/session_variables.sql @@ -2,5 +2,4 @@ set global insert_visible_timeout_ms=60000; set global enable_auto_analyze=false; set global enable_audit_plugin=true; -set global enable_memtable_on_sink_node=false; -set global enable_pipeline_x_engine=false; +set global enable_memtable_on_sink_node=false; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org