This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 64b52dea660fc5feba34b8650557f1323d55b28b
Author: fengguangyuan <qq272101...@gmail.com>
AuthorDate: Thu Jul 27 15:46:27 2023 +0800

    KYLIN-5771 Query cannot be interrupted during segment pruning
    
    Co-authored-by: Guangyuan Feng <guangyuan.f...@kyligence.io>
---
 .../kylin/query/util/QueryInterruptChecker.java    |  53 +++-
 .../apache/kylin/query/util/SlowQueryDetector.java |   2 +-
 .../query/routing/PartitionPruningRuleTest.java    | 313 +++++++++++++++++++++
 .../query/routing/SegmentPruningRuleTest.java      | 277 ++++++++++++++++++
 .../kylin/query/routing/PartitionPruningRule.java  |  15 +
 .../kylin/query/routing/SegmentPruningRule.java    |  33 ++-
 .../kylin/query/util/QueryContextCutter.java       |   5 +
 7 files changed, 689 insertions(+), 9 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
index b163cc5a31..df4eb94589 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
@@ -20,6 +20,7 @@ package org.apache.kylin.query.util;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.query.exception.UserStopQueryException;
 
@@ -32,15 +33,65 @@ public class QueryInterruptChecker {
         // This is Utils.
     }
 
+    /**
+     * @deprecated Use {@link 
this#checkQueryCanceledOrThreadInterrupted(String, String)} instead.
+     * The semantic of this method is confused in some scenarios.
+     */
+    @Deprecated
     public static void checkThreadInterrupted(String errorMsgLog, String 
stepInfo) {
         if (Thread.currentThread().isInterrupted()) {
             log.error("{} {}", QueryContext.current().getQueryId(), 
errorMsgLog);
-            if 
(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser())
 {
+            if 
(SlowQueryDetector.getRunningQueries().containsKey(Thread.currentThread())
+                && 
SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser())
 {
                 throw new UserStopQueryException("");
             }
+
             QueryContext.current().getQueryTagInfo().setTimeout(true);
             throw new KylinTimeoutException("The query exceeds the set time 
limit of "
                     + 
KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo);
         }
     }
+
+    /**
+     * Within the global context, STOP is same to CANCEL, therefore to stop a 
query is equal to
+     * cancel a query.
+     * There are some possible reasons to cancel the query recorded in the 
current thread context.
+     * {@link UserStopQueryException} is for stopping the query from the 
request, see
+     * {@link SlowQueryDetector#stopQuery(String)}.
+     * {@link KylinTimeoutException} is for run out of the timeout of the 
query, see
+     * {@link SlowQueryDetector::checkTimeout()}.
+     * {@link KylinRuntimeException} is risking inconsistent states to stop 
the query.
+     * {@link InterruptedException} is for other interruptions.
+     * @param cause the reason of canceling the current query or interrupt the 
working thread
+     * @param step the processing point
+     */
+    public static void checkQueryCanceledOrThreadInterrupted(String cause, 
String step) throws InterruptedException {
+        SlowQueryDetector.QueryEntry entry = 
SlowQueryDetector.getRunningQueries().getOrDefault(Thread.currentThread(),
+            null);
+        if (entry != null) {
+            if (entry.isStopByUser() && 
entry.getPlannerCancelFlag().isCancelRequested()
+                && Thread.currentThread().isInterrupted()) {
+                throw new UserStopQueryException(String.format("Manually stop 
the query %s. Caused: %s. Step: %s",
+                    entry.getQueryId(), cause, step));
+            }
+
+            if (entry.getPlannerCancelFlag().isCancelRequested() && 
Thread.currentThread().isInterrupted()) {
+                QueryContext.current().getQueryTagInfo().setTimeout(true);
+                throw new KylinTimeoutException(String.format("Run out of time 
of the query %s. Caused: %s. Step: %s",
+                    entry.getQueryId(), cause, step));
+            }
+
+            if (entry.isStopByUser() || 
entry.getPlannerCancelFlag().isCancelRequested()) {
+                throw new UserStopQueryException(String.format(
+                    "You are trying to cancel the query %s with inconsistent 
states:"
+                        + " [isStopByUser=%s, isCancelRequested=%s]! Caused: 
%s. Step: %s",
+                    entry.getQueryId(), entry.isStopByUser(), 
entry.getPlannerCancelFlag().isCancelRequested(),
+                    cause, step));
+            }
+        }
+        if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException(String.format("Interrupted on 
thread %s. Caused: %s. Step: %s",
+                Thread.currentThread().getName(), cause, step));
+        }
+    }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
index a2ca994c5f..bc00ce3754 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
@@ -196,7 +196,7 @@ public class SlowQueryDetector extends Thread {
             return (System.currentTimeMillis() - startTime) / 1000;
         }
 
-        private boolean setInterruptIfTimeout() {
+        public boolean setInterruptIfTimeout() {
             if (isAsyncQuery) {
                 return false;
             }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java
new file mode 100644
index 0000000000..0cb9e8f62f
--- /dev/null
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import static org.awaitility.Awaitility.await;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.metadata.cube.model.LayoutPartition;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
+import org.apache.kylin.metadata.model.MultiPartitionDesc;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.SlowQueryDetector;
+import org.apache.kylin.util.OlapContextTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PartitionPruningRuleTest extends NLocalWithSparkSessionTest {
+
+    private Candidate prepareCandidate() throws SqlParseException {
+        String project = "multi_level_partition";
+        String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+        NDataModelManager modelManager = 
NDataModelManager.getInstance(getTestConfig(), project);
+
+        String sql = "select cal_dt, sum(price) from test_kylin_fact" + " 
inner join test_account"
+                + " on test_kylin_fact.seller_id = test_account.account_id"
+                + " where cal_dt > '2012-01-01' and cal_dt < '2012-01-04' and 
lstg_site_id = 1 group by cal_dt";
+        OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(project, 
sql, true).get(0);
+
+        int newPartitionsNum = 1000_000;
+
+        modelManager.updateDataModel(modelId, copied -> {
+            for (int i = 0; i < newPartitionsNum; i++) {
+                copied.getMultiPartitionDesc()
+                    .getPartitions()
+                    .add(new MultiPartitionDesc.PartitionInfo(i + 100, new 
String[] { String.valueOf(i + 100) }));
+            }
+        });
+
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(getTestConfig(), project);
+        NDataflow dataflowCopied = dataflowManager.getDataflow(modelId).copy();
+
+        // append new partitions
+        NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId);
+        List<NDataLayout> toUpdateLayouts = new ArrayList<>();
+        for (NDataSegment segment : dataflowCopied.getSegments()) {
+            toUpdateLayouts.add(segment.getLayout(1));
+            for (int i = 0; i < newPartitionsNum; i++) {
+                LayoutPartition layoutPartition = new LayoutPartition(i + 100);
+                layoutPartition.setBucketId(i + 100);
+                segment.getLayout(1).getMultiPartition().add(layoutPartition);
+            }
+        }
+        dataflowUpdate.setToAddOrUpdateLayouts(toUpdateLayouts.toArray(new 
NDataLayout[0]));
+        dataflowManager.updateDataflow(dataflowUpdate);
+
+        NDataflow dataflow = dataflowManager.getDataflow(modelId);
+        Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+        olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+
+        Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+        candidate.getQueryableSeg()
+                
.setBatchSegments(ImmutableList.of(dataflow.getSegment("8892fa3f-f607-4eec-8159-7c5ae2f16942")));
+        return candidate;
+    }
+
+    @Test
+    public void testCancelAndInterruptPruning() throws SqlParseException {
+        Candidate candidate = prepareCandidate();
+        testCancelQuery(candidate);
+        testCancelQuery(candidate, queryEntry -> 
queryEntry.getPlannerCancelFlag().requestCancel());
+        testCancelQuery(candidate, queryEntry -> 
queryEntry.setStopByUser(true));
+        testCancelAsyncQuery(candidate);
+        testInterrupt(candidate);
+        testTimeout(candidate);
+    }
+
+    private void testCancelQuery(Candidate candidate) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicBoolean res = new AtomicBoolean(false);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                PartitionPruningRule rule = new PartitionPruningRule();
+                rule.apply(candidate);
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        slowQueryDetector.get().stopQuery("pruning");
+
+        Assert.assertFalse(queryEntry.get().isAsyncQuery());
+        Assert.assertTrue(queryEntry.get().isStopByUser()
+                && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertFalse(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+    }
+
+    private void testCancelQuery(Candidate candidate, 
Consumer<SlowQueryDetector.QueryEntry> updater) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicBoolean res = new AtomicBoolean(false);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                PartitionPruningRule rule = new PartitionPruningRule();
+                rule.apply(candidate);
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        Assert.assertTrue(t.isAlive());
+
+        updater.accept(queryEntry.get());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertFalse(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+        Assert.assertTrue(exp.get().getMessage().contains("inconsistent 
states"));
+    }
+
+    private void testCancelAsyncQuery(Candidate candidate) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicBoolean res = new AtomicBoolean(false);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                PartitionPruningRule rule = new PartitionPruningRule();
+                rule.apply(candidate);
+                res.set(true);
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        String queryId = queryEntry.get().getQueryId();
+        slowQueryDetector.get().stopQuery(queryId);
+
+        Assert.assertTrue(queryEntry.get().isAsyncQuery());
+        Assert.assertTrue(queryEntry.get().isStopByUser()
+                && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertFalse(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+    }
+
+    private void testInterrupt(Candidate candidate) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicBoolean res = new AtomicBoolean(false);
+
+        Thread t = new Thread(() -> {
+            try {
+                PartitionPruningRule rule = new PartitionPruningRule();
+                rule.apply(candidate);
+                res.set(true);
+            } catch (Exception e) {
+                exp.set(e);
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS).until(t::isAlive);
+
+        t.interrupt();
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertFalse(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), 
exp.get() instanceof KylinRuntimeException);
+        Assert.assertTrue(exp.get().getCause() instanceof 
InterruptedException);
+    }
+
+    private void testTimeout(Candidate candidate) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicBoolean res = new AtomicBoolean(false);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+                slowQueryDetector.set(new SlowQueryDetector(10, 100));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                PartitionPruningRule rule = new PartitionPruningRule();
+                rule.apply(candidate);
+                res.set(true);
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(500, 
TimeUnit.MILLISECONDS)
+                .atMost(60, TimeUnit.SECONDS).until(() -> queryEntry.get() != 
null);
+
+        Assert.assertTrue(queryEntry.get().setInterruptIfTimeout());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        Assert.assertTrue(!t.isAlive() && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertFalse(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), 
exp.get() instanceof KylinTimeoutException);
+    }
+}
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java
index 5ffafcb0a6..7468c9706a 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java
@@ -18,13 +18,21 @@
 
 package org.apache.kylin.query.routing;
 
+import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.rex.RexExecutorImpl;
+import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.TempMetadataBuilder;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
@@ -33,11 +41,17 @@ import 
org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.engine.TypeSystem;
 import org.apache.kylin.query.engine.meta.SimpleDataContext;
+import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.util.OlapContextTestUtil;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparderEnv;
@@ -52,6 +66,8 @@ import org.springframework.test.util.ReflectionTestUtils;
 
 import lombok.val;
 
+import static org.awaitility.Awaitility.await;
+
 public class SegmentPruningRuleTest extends NLocalWithSparkSessionTest {
 
     @BeforeClass
@@ -215,6 +231,267 @@ public class SegmentPruningRuleTest extends 
NLocalWithSparkSessionTest {
                 Long.parseLong("1633928400000"), dateType);
     }
 
+    @Test
+    public void testCancelAndInterruptPruning() throws SqlParseException {
+        val dataflowId = "3718b614-5191-2254-77e9-f4c5ca64e312";
+        KylinConfig kylinConfig = getTestConfig();
+
+        String sql = "SELECT * FROM TEST_DB.DATE_TIMESTAMP_TABLE WHERE id = 
'121' AND (\n"
+                + "(TIMESTAMP_10 >= '2021-11-03')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-04')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-03')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-05')\n" + ")\n" + "OR (\n" + "(\n"
+                + "TIMESTAMP_10 >= '2021-11-03'\n" + ")\n" + "AND 
(TIMESTAMP_10 <= '2021-11-06')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-04')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-05')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-06')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-07')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-08')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-12')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-17')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-18')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-19')\n" + ")\n" + "OR (\n"
+                + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= 
'2021-11-17')\n" + ")";
+
+        String project = getProject();
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(kylinConfig, project);
+        List<OLAPContext> olapContexts = 
OlapContextTestUtil.getOlapContexts(getProject(), sql);
+        OLAPContext context = olapContexts.get(0);
+
+        // append new segments
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2021, Calendar.DECEMBER, 6);
+
+        final int newSegmentCount = 100_000;
+        NDataSegment[] newSegments = new NDataSegment[newSegmentCount];
+        for (int i = 0; i < newSegmentCount; i++) {
+            long startTime = calendar.getTimeInMillis();
+            calendar.add(Calendar.DAY_OF_MONTH, 1);
+            long endTime = calendar.getTimeInMillis();
+            NDataSegment newSegment = new NDataSegment(null,
+                    new SegmentRange.TimePartitionedSegmentRange(startTime, 
endTime));
+            newSegment.setStatus(SegmentStatusEnum.READY);
+            newSegments[i] = newSegment;
+        }
+
+        NDataflowUpdate update = new NDataflowUpdate(dataflowId);
+        update.setToAddSegs(newSegments);
+        dataflowManager.updateDataflowWithoutIndex(update);
+
+        NDataflow dataflow = dataflowManager.getDataflow(dataflowId);
+
+        CalciteSchema rootSchema = new QueryExec(project, 
kylinConfig).getRootSchema();
+        SimpleDataContext dataContext = new 
SimpleDataContext(rootSchema.plus(), TypeSystem.javaTypeFactory(),
+                kylinConfig);
+        context.firstTableScan.getCluster().getPlanner().setExecutor(new 
RexExecutorImpl(dataContext));
+        Map<String, String> map = 
RealizationChooser.matchJoins(dataflow.getModel(), context, false, false);
+        context.fixModel(dataflow.getModel(), map);
+
+        Assert.assertTrue("Unexpected size " + 
dataflow.getQueryableSegments().size(),
+                dataflow.getQueryableSegments().size() > newSegmentCount);
+
+        testCancelQuery(dataflow, context);
+        testCancelQuery(dataflow, context, queryEntry -> 
queryEntry.getPlannerCancelFlag().requestCancel());
+        testCancelQuery(dataflow, context, queryEntry -> 
queryEntry.setStopByUser(true));
+        testCancelAsyncQuery(dataflow, context);
+        testInterrupt(dataflow, context);
+        testTimeout(dataflow, context);
+    }
+
+    private void testCancelQuery(NDataflow dataflow, OLAPContext context) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicReference<Segments<NDataSegment>> res = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                res.set(new SegmentPruningRule().pruneSegments(dataflow, 
context));
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        slowQueryDetector.get().stopQuery("pruning");
+
+        Assert.assertFalse(queryEntry.get().isAsyncQuery());
+        Assert.assertTrue(queryEntry.get().isStopByUser()
+                && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertNull(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+    }
+
+    private void testCancelQuery(NDataflow dataflow, OLAPContext context,
+            Consumer<SlowQueryDetector.QueryEntry> updater) {
+
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicReference<Segments<NDataSegment>> res = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                res.set(new SegmentPruningRule().pruneSegments(dataflow, 
context));
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        Assert.assertTrue(t.isAlive());
+
+        updater.accept(queryEntry.get());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertNull(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+        Assert.assertTrue(exp.get().getMessage().contains("inconsistent 
states"));
+    }
+
+    private void testCancelAsyncQuery(NDataflow dataflow, OLAPContext context) 
{
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicReference<Segments<NDataSegment>> res = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+                slowQueryDetector.set(new SlowQueryDetector(100, 10_000));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                res.set(new SegmentPruningRule().pruneSegments(dataflow, 
context));
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, 
TimeUnit.SECONDS)
+                .until(() -> queryEntry.get() != null);
+
+        String queryId = queryEntry.get().getQueryId();
+        slowQueryDetector.get().stopQuery(queryId);
+
+        Assert.assertTrue(queryEntry.get().isAsyncQuery());
+        Assert.assertTrue(queryEntry.get().isStopByUser()
+                && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertNull(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(),
+                exp.get() instanceof UserStopQueryException);
+    }
+
+    private void testInterrupt(NDataflow dataflow, OLAPContext context) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicReference<Segments<NDataSegment>> res = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                res.set(new SegmentPruningRule().pruneSegments(dataflow, 
context));
+            } catch (Exception e) {
+                exp.set(e);
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(6, 
TimeUnit.SECONDS).until(t::isAlive);
+
+        t.interrupt();
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertNull(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), 
exp.get() instanceof KylinRuntimeException);
+        Assert.assertTrue(exp.get().getCause() instanceof 
InterruptedException);
+    }
+
+    private void testTimeout(NDataflow dataflow, OLAPContext context) {
+        AtomicReference<Exception> exp = new AtomicReference<>(null);
+        AtomicReference<Segments<NDataSegment>> res = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector> slowQueryDetector = new 
AtomicReference<>(null);
+        AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new 
AtomicReference<>(null);
+
+        Thread t = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+                slowQueryDetector.set(new SlowQueryDetector(10, 100));
+                slowQueryDetector.get().queryStart("pruning");
+                
queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()));
+                res.set(new SegmentPruningRule().pruneSegments(dataflow, 
context));
+            } catch (Exception e) {
+                exp.set(e);
+            } finally {
+                slowQueryDetector.get().queryEnd();
+            }
+        });
+        t.start();
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(100, 
TimeUnit.MILLISECONDS)
+                .atMost(60, TimeUnit.SECONDS).until(() -> queryEntry.get() != 
null);
+
+        Assert.assertTrue(queryEntry.get().setInterruptIfTimeout());
+
+        try {
+            t.join();
+        } catch (InterruptedException e) {
+            // ignored
+        }
+
+        Assert.assertTrue(!t.isAlive() && 
queryEntry.get().getPlannerCancelFlag().isCancelRequested());
+
+        await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, 
TimeUnit.SECONDS).until(() -> exp.get() != null);
+
+        Assert.assertNull(res.get());
+        Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), 
exp.get() instanceof KylinTimeoutException);
+    }
+
     @Test
     public void testSegmentPruningTimestampType() throws Exception {
         val dataflowId = "3718b614-5191-2254-77e9-f4c5ca64e312";
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java
index 9e0eb263c8..c9814b1f34 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java
@@ -34,6 +34,8 @@ import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
@@ -47,8 +49,10 @@ import 
org.apache.kylin.metadata.model.MultiPartitionKeyMappingImpl;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.relnode.OLAPTableScan;
+import org.apache.kylin.query.util.QueryInterruptChecker;
 import org.apache.kylin.query.util.RexUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -156,6 +160,10 @@ public class PartitionPruningRule extends PruningRule {
         List<TblColRef> partitionColRefs = 
model.getMultiPartitionDesc().getColumnRefs();
         for (MultiPartitionDesc.PartitionInfo partition : 
model.getMultiPartitionDesc().getPartitions()) {
             try {
+                QueryInterruptChecker.checkQueryCanceledOrThreadInterrupted(
+                    "Interrupted during pruning partitions!",
+                    "pruning partitions");
+
                 RexNode partitionRex = partitionToRexCall(partitionColRefs, 
partition.getValues(), rexBuilder,
                         olapContext.allTableScans);
                 RexNode mappingColRex = 
multiPartitionKeyMappingToRex(rexBuilder, partition.getValues(),
@@ -175,6 +183,13 @@ public class PartitionPruningRule extends PruningRule {
                     segPartitionMap.forEach((dataSegment, partitionIds) -> 
partitionIds.remove(partition.getId()));
                     continue;
                 }
+            } catch (InterruptedException ie) {
+                log.error(String.format("Interrupted on pruning partitions 
from %s!", partition.toString()), ie);
+                Thread.currentThread().interrupt();
+                throw new KylinRuntimeException(ie);
+            } catch (UserStopQueryException | KylinTimeoutException e) {
+                log.error(String.format("Stop pruning partitions from %s!", 
partition.toString()), e);
+                throw e;
             } catch (Exception ex) {
                 log.warn("Multi-partition pruning error: ", ex);
             }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java
index 3464271cc2..5618417e9d 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java
@@ -47,6 +47,8 @@ import org.apache.calcite.util.NlsString;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet;
@@ -66,7 +68,9 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.QueryInterruptChecker;
 import org.apache.kylin.query.util.RexUtils;
 
 import lombok.val;
@@ -187,29 +191,44 @@ public class SegmentPruningRule extends PruningRule {
         Segments<NDataSegment> selectedSegments = new Segments<>();
         PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext);
         RexBuilder rexBuilder = 
olapContext.firstTableScan.getCluster().getRexBuilder();
+
         for (NDataSegment dataSegment : dataflow.getQueryableSegments()) {
             try {
+                QueryInterruptChecker.checkQueryCanceledOrThreadInterrupted(
+                    "Interrupted during pruning segments by partition filter!",
+                    "pruning segments by partition filter");
+
+                // To improve this simplification after fixed 
https://olapio.atlassian.net/browse/KE-42295
+                // evaluate segment range [startTime, endTime)
                 val segmentRanges = transformSegment2RexCall(dataSegment, 
partitionCol.getPartitionDateFormat(),
-                        rexBuilder, partitionColInputRef, 
partitionCol.getPartitionDateColumnRef().getType(),
-                        dataflow.isStreaming());
+                    rexBuilder, partitionColInputRef, 
partitionCol.getPartitionDateColumnRef().getType(),
+                    dataflow.isStreaming());
                 // compare with segment start
                 val segmentStartPredicate = RelOptPredicateList.of(rexBuilder,
-                        Lists.newArrayList(segmentRanges.getFirst()));
+                    Lists.newArrayList(segmentRanges.getFirst()));
                 var simplifiedWithPredicate = 
rexSimplify.withPredicates(segmentStartPredicate)
-                        .simplify(simplifiedSqlFilter);
+                    .simplify(simplifiedSqlFilter);
                 if (simplifiedWithPredicate.isAlwaysFalse()) {
                     continue;
                 }
                 // compare with segment end
                 val segmentEndPredicate = RelOptPredicateList.of(rexBuilder,
-                        Lists.newArrayList(segmentRanges.getSecond()));
+                    Lists.newArrayList(segmentRanges.getSecond()));
                 simplifiedWithPredicate = 
rexSimplify.withPredicates(segmentEndPredicate)
-                        .simplify(simplifiedWithPredicate);
+                    .simplify(simplifiedWithPredicate);
                 if (!simplifiedWithPredicate.isAlwaysFalse()) {
                     selectedSegments.add(dataSegment);
                 }
+
+            } catch (InterruptedException ie) {
+                log.error(String.format("Interrupted on pruning segments from 
%s!", dataSegment.toString()), ie);
+                Thread.currentThread().interrupt();
+                throw new KylinRuntimeException(ie);
+            } catch (UserStopQueryException | KylinTimeoutException e) {
+                log.error(String.format("Stop pruning segments from %s!", 
dataSegment.toString()), e);
+                throw e;
             } catch (Exception ex) {
-                log.warn("Segment pruning error: ", ex);
+                log.warn(String.format("To skip the exception on pruning 
segment %s!", dataSegment.toString()), ex);
                 selectedSegments.add(dataSegment);
             }
         }
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
index fc6734dfcc..b6620a0091 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
@@ -25,11 +25,14 @@ import java.util.Locale;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import 
org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
+import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.ContextUtil;
 import org.apache.kylin.query.relnode.KapRel;
 import org.apache.kylin.query.relnode.OLAPContext;
@@ -86,6 +89,8 @@ public class QueryContextCutter {
                     throw e;
                 }
                 reCutStrategy.tryCutToSmallerContexts(root, e);
+            } catch (UserStopQueryException | KylinTimeoutException | 
KylinRuntimeException e) {
+                throw e;
             } finally {
                 // auto-modeling should invoke unfixModel() because it may 
select some realizations.
                 if (isReCutBanned) {


Reply via email to