This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1304185adb [Regression](Fix) fix the regression of pipeline and ConcurrentModificationException failed (#14849) 1304185adb is described below commit 1304185adbcbfb1b988661edf3e5c900ae191626 Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue Dec 6 15:34:32 2022 +0800 [Regression](Fix) fix the regression of pipeline and ConcurrentModificationException failed (#14849) * [fix](ut) try to fix ConcurrentModifycationException bug * [Regression](Fix) fix the regression of pipeline and ConcurrentModificationException failed Co-authored-by: morningman <morning...@163.com> --- be/src/pipeline/exec/aggregation_source_operator.h | 4 ++++ be/src/pipeline/exec/sort_source_operator.h | 1 + be/src/pipeline/exec/streaming_aggregation_source_operator.h | 1 + be/src/pipeline/task_scheduler.cpp | 2 -- .../main/java/org/apache/doris/catalog/InternalSchemaInitializer.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java | 3 +++ .../src/test/java/org/apache/doris/utframe/TestWithFeService.java | 1 + fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java | 2 ++ regression-test/pipeline/p0/conf/regression-conf.groovy | 2 +- regression-test/suites/correctness/test_view_with_with_clause.groovy | 4 ++-- 10 files changed, 16 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 87e8f2bb11..2176cc3dee 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -37,6 +37,10 @@ public: class AggSourceOperator final : public Operator<AggSourceOperatorBuilder> { public: AggSourceOperator(OperatorBuilderBase*, ExecNode*); + // if exec node split to: sink, source operator. the source operator + // should skip `alloc_resoucre()` function call, only sink operator + // call the function + Status open(RuntimeState*) override { return Status::OK(); } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index e8e1afe6f1..913726d856 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -41,6 +41,7 @@ public: class SortSourceOperator final : public Operator<SortSourceOperatorBuilder> { public: SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node); + Status open(RuntimeState*) override { return Status::OK(); } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h index 2685322e2b..11ec3a6725 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h @@ -43,6 +43,7 @@ public: StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<AggContext>); bool can_read() override; Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override; + Status open(RuntimeState*) override { return Status::OK(); } private: std::shared_ptr<AggContext> _agg_context; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index f8d7e28f91..051775f094 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -186,7 +186,6 @@ Status TaskScheduler::start() { .build(&_fix_thread_pool); _markers.reserve(cores); for (size_t i = 0; i < cores; ++i) { - LOG(INFO) << "Start TaskScheduler thread " << i; _markers.push_back(std::make_unique<std::atomic<bool>>(true)); RETURN_IF_ERROR( _fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i))); @@ -205,7 +204,6 @@ Status TaskScheduler::schedule_task(PipelineTask* task) { } void TaskScheduler::_do_work(size_t index) { - LOG(INFO) << "Start TaskScheduler worker " << index; auto queue = _task_queue; const auto& marker = _markers[index]; while (*marker) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 8cafd7d777..a0c92349c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -56,7 +56,7 @@ public class InternalSchemaInitializer extends Thread { public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1; public void run() { - if (FeConstants.runningUnitTest) { + if (FeConstants.disableInternalSchemaDb) { return; } while (!created()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1f53f699e2..f7291fbd62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -46,6 +46,9 @@ public class FeConstants { // set to true to skip some step when running FE unit test public static boolean runningUnitTest = false; + // set to true to disable internal schema db + public static boolean disableInternalSchemaDb = false; + // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 5ecce69719..49b39f2a1f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -120,6 +120,7 @@ public abstract class TestWithFeService { @BeforeAll public final void beforeAll() throws Exception { + FeConstants.disableInternalSchemaDb = true; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); beforeCluster(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index adc5a86a39..efe3c729cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -201,6 +201,7 @@ public class UtFrameUtils { public static void createDorisCluster(String runningDir, int backendNum) throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { + FeConstants.disableInternalSchemaDb = true; int feRpcPort = startFEServer(runningDir); List<Backend> bes = Lists.newArrayList(); for (int i = 0; i < backendNum; i++) { @@ -235,6 +236,7 @@ public class UtFrameUtils { // set runningUnitTest to true, so that for ut, // the agent task will be sent to "127.0.0.1" to make cluster running well. FeConstants.runningUnitTest = true; + FeConstants.disableInternalSchemaDb = true; int feRpcPort = startFEServer(runningDir); for (int i = 0; i < backendNum; i++) { String host = "127.0.0." + (i + 1); diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index b8e643b115..d5aab7431f 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -48,7 +48,7 @@ testDirectories = "" // this groups will not be executed excludeGroups = "" // this suites will not be executed -excludeSuites = "" +excludeSuites = "test_date_function" // this directories will not be executed excludeDirectories = "" diff --git a/regression-test/suites/correctness/test_view_with_with_clause.groovy b/regression-test/suites/correctness/test_view_with_with_clause.groovy index 2bb072aaee..92e71429d9 100644 --- a/regression-test/suites/correctness/test_view_with_with_clause.groovy +++ b/regression-test/suites/correctness/test_view_with_with_clause.groovy @@ -34,7 +34,7 @@ suite("test_view_with_with_clause") { sql """insert into test_view_with_with_clause values ('2022-12-02','002','002001');""" sql """ - create view viewtest_test_view_with_with_clause (b,cnt) as + create view IF NOT EXISTS viewtest_test_view_with_with_clause (b,cnt) as with aaa as ( select b,count(distinct c) cnt from test_view_with_with_clause @@ -48,4 +48,4 @@ suite("test_view_with_with_clause") { qt_sql """ select * from viewtest_test_view_with_with_clause; """ -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org