This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1304185adb [Regression](Fix) fix the regression of pipeline and 
ConcurrentModificationException failed (#14849)
1304185adb is described below

commit 1304185adbcbfb1b988661edf3e5c900ae191626
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue Dec 6 15:34:32 2022 +0800

    [Regression](Fix) fix the regression of pipeline and 
ConcurrentModificationException failed (#14849)
    
    * [fix](ut) try to fix ConcurrentModifycationException bug
    
    * [Regression](Fix) fix the regression of pipeline and 
ConcurrentModificationException failed
    
    Co-authored-by: morningman <morning...@163.com>
---
 be/src/pipeline/exec/aggregation_source_operator.h                    | 4 ++++
 be/src/pipeline/exec/sort_source_operator.h                           | 1 +
 be/src/pipeline/exec/streaming_aggregation_source_operator.h          | 1 +
 be/src/pipeline/task_scheduler.cpp                                    | 2 --
 .../main/java/org/apache/doris/catalog/InternalSchemaInitializer.java | 2 +-
 fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java     | 3 +++
 .../src/test/java/org/apache/doris/utframe/TestWithFeService.java     | 1 +
 fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java   | 2 ++
 regression-test/pipeline/p0/conf/regression-conf.groovy               | 2 +-
 regression-test/suites/correctness/test_view_with_with_clause.groovy  | 4 ++--
 10 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 87e8f2bb11..2176cc3dee 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -37,6 +37,10 @@ public:
 class AggSourceOperator final : public Operator<AggSourceOperatorBuilder> {
 public:
     AggSourceOperator(OperatorBuilderBase*, ExecNode*);
+    // if exec node split to: sink, source operator. the source operator
+    // should skip `alloc_resoucre()` function call, only sink operator
+    // call the function
+    Status open(RuntimeState*) override { return Status::OK(); }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index e8e1afe6f1..913726d856 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -41,6 +41,7 @@ public:
 class SortSourceOperator final : public Operator<SortSourceOperatorBuilder> {
 public:
     SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* 
sort_node);
+    Status open(RuntimeState*) override { return Status::OK(); }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 2685322e2b..11ec3a6725 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -43,6 +43,7 @@ public:
     StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, 
std::shared_ptr<AggContext>);
     bool can_read() override;
     Status get_block(RuntimeState*, vectorized::Block*, SourceState& 
source_state) override;
+    Status open(RuntimeState*) override { return Status::OK(); }
 
 private:
     std::shared_ptr<AggContext> _agg_context;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index f8d7e28f91..051775f094 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -186,7 +186,6 @@ Status TaskScheduler::start() {
             .build(&_fix_thread_pool);
     _markers.reserve(cores);
     for (size_t i = 0; i < cores; ++i) {
-        LOG(INFO) << "Start TaskScheduler thread " << i;
         _markers.push_back(std::make_unique<std::atomic<bool>>(true));
         RETURN_IF_ERROR(
                 
_fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i)));
@@ -205,7 +204,6 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
 }
 
 void TaskScheduler::_do_work(size_t index) {
-    LOG(INFO) << "Start TaskScheduler worker " << index;
     auto queue = _task_queue;
     const auto& marker = _markers[index];
     while (*marker) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 8cafd7d777..a0c92349c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -56,7 +56,7 @@ public class InternalSchemaInitializer extends Thread {
     public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1;
 
     public void run() {
-        if (FeConstants.runningUnitTest) {
+        if (FeConstants.disableInternalSchemaDb) {
             return;
         }
         while (!created()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 1f53f699e2..f7291fbd62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -46,6 +46,9 @@ public class FeConstants {
     // set to true to skip some step when running FE unit test
     public static boolean runningUnitTest = false;
 
+    // set to true to disable internal schema db
+    public static boolean disableInternalSchemaDb = false;
+
     // default scheduler interval is 10 seconds
     public static int default_scheduler_interval_millisecond = 10000;
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 5ecce69719..49b39f2a1f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -120,6 +120,7 @@ public abstract class TestWithFeService {
 
     @BeforeAll
     public final void beforeAll() throws Exception {
+        FeConstants.disableInternalSchemaDb = true;
         beforeCreatingConnectContext();
         connectContext = createDefaultCtx();
         beforeCluster();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index adc5a86a39..efe3c729cf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -201,6 +201,7 @@ public class UtFrameUtils {
 
     public static void createDorisCluster(String runningDir, int backendNum) 
throws EnvVarNotSetException, IOException,
             FeStartException, NotInitException, DdlException, 
InterruptedException {
+        FeConstants.disableInternalSchemaDb = true;
         int feRpcPort = startFEServer(runningDir);
         List<Backend> bes = Lists.newArrayList();
         for (int i = 0; i < backendNum; i++) {
@@ -235,6 +236,7 @@ public class UtFrameUtils {
         // set runningUnitTest to true, so that for ut,
         // the agent task will be sent to "127.0.0.1" to make cluster running 
well.
         FeConstants.runningUnitTest = true;
+        FeConstants.disableInternalSchemaDb = true;
         int feRpcPort = startFEServer(runningDir);
         for (int i = 0; i < backendNum; i++) {
             String host = "127.0.0." + (i + 1);
diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy 
b/regression-test/pipeline/p0/conf/regression-conf.groovy
index b8e643b115..d5aab7431f 100644
--- a/regression-test/pipeline/p0/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p0/conf/regression-conf.groovy
@@ -48,7 +48,7 @@ testDirectories = ""
 // this groups will not be executed
 excludeGroups = ""
 // this suites will not be executed
-excludeSuites = ""
+excludeSuites = "test_date_function"
 // this directories will not be executed
 excludeDirectories = ""
 
diff --git 
a/regression-test/suites/correctness/test_view_with_with_clause.groovy 
b/regression-test/suites/correctness/test_view_with_with_clause.groovy
index 2bb072aaee..92e71429d9 100644
--- a/regression-test/suites/correctness/test_view_with_with_clause.groovy
+++ b/regression-test/suites/correctness/test_view_with_with_clause.groovy
@@ -34,7 +34,7 @@ suite("test_view_with_with_clause") {
     sql """insert into test_view_with_with_clause values 
('2022-12-02','002','002001');"""
 
     sql """
-        create view viewtest_test_view_with_with_clause (b,cnt) as 
+        create view IF NOT EXISTS viewtest_test_view_with_with_clause (b,cnt) 
as 
             with aaa as (
                 select b,count(distinct c) cnt 
                 from  test_view_with_with_clause
@@ -48,4 +48,4 @@ suite("test_view_with_with_clause") {
     qt_sql """
         select * from viewtest_test_view_with_with_clause;
     """
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to