This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 33376b71f00ecca0a1797324c054d0ff32a44c31
Author: wxy <dut.xian...@gmail.com>
AuthorDate: Wed Jan 4 14:08:25 2023 +0800

    [Feature](export) Support cancel export statement (#15128)
    
    
    
    Co-authored-by: wangxian...@360shuke.com <wangxian...@360shuke.com>
---
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +
 .../{CancelLoadStmt.java => CancelExportStmt.java} |  40 ++---
 .../org/apache/doris/analysis/CancelLoadStmt.java  |   3 +
 .../main/java/org/apache/doris/load/ExportJob.java |  14 +-
 .../main/java/org/apache/doris/load/ExportMgr.java |  61 ++++++-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../org/apache/doris/task/ExportExportingTask.java |  26 ++-
 .../doris/analysis/CancelExportStmtTest.java       | 190 +++++++++++++++++++++
 8 files changed, 301 insertions(+), 40 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 84853c31c9..83d105c734 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -3965,6 +3965,10 @@ cancel_param ::=
     {:
         RESULT = new CancelLoadStmt(db, parser.where);
     :}
+    | KW_EXPORT opt_db:db opt_wild_where
+    {:
+        RESULT = new CancelExportStmt(db, parser.where);
+    :}
     | KW_ALTER KW_TABLE opt_alter_type:type KW_FROM table_name:table 
cancel_rollup_job_id_list:list
     {:
         RESULT = new CancelAlterTableStmt(type, table, list);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
similarity index 78%
copy from fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
index c08f6370a4..71998ba226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
@@ -21,7 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.load.ExportJob;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
@@ -31,11 +31,11 @@ import java.util.Set;
 
 
 /**
- * CANCEL LOAD statement used to cancel load job.
+ * CANCEL EXPORT statement used to cancel export job.
  * syntax:
- *     CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+ *     CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like 
"label_pattern" | STATE = "PENDING/EXPORTING"]
  **/
-public class CancelLoadStmt extends DdlStmt {
+public class CancelExportStmt extends DdlStmt {
 
     private static final Set<String> SUPPORT_COLUMNS = 
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
 
@@ -53,7 +53,7 @@ public class CancelLoadStmt extends DdlStmt {
 
     private Expr whereClause;
 
-    public CancelLoadStmt(String dbName, Expr whereClause) {
+    public CancelExportStmt(String dbName, Expr whereClause) {
         this.dbName = dbName;
         this.whereClause = whereClause;
         this.SUPPORT_COLUMNS.add("label");
@@ -63,34 +63,33 @@ public class CancelLoadStmt extends DdlStmt {
     private void checkColumn(Expr expr, boolean like) throws AnalysisException 
{
         String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
         if (!SUPPORT_COLUMNS.contains(inputCol)) {
-            throw new AnalysisException("Current not support " + inputCol);
+            throw new AnalysisException("Current only support label and state, 
invalid column: " + inputCol);
         }
         if (!(expr.getChild(1) instanceof StringLiteral)) {
-            throw new AnalysisException("Value must is string");
+            throw new AnalysisException("Value must be a string");
         }
 
         String inputValue = expr.getChild(1).getStringValue();
         if (Strings.isNullOrEmpty(inputValue)) {
-            throw new AnalysisException("Value can't is null");
-        }
-        if (like && !inputValue.contains("%")) {
-            inputValue = "%" + inputValue + "%";
+            throw new AnalysisException("Value can't be null");
         }
+
         if (inputCol.equalsIgnoreCase("label")) {
             label = inputValue;
         }
+
         if (inputCol.equalsIgnoreCase("state")) {
             if (like) {
                 throw new AnalysisException("Only label can use like");
             }
             state = inputValue;
             try {
-                JobState jobState = JobState.valueOf(state);
-                if (jobState != JobState.PENDING && jobState != JobState.ETL 
&& jobState != JobState.LOADING) {
-                    throw new AnalysisException("invalid state: " + state);
+                ExportJob.JobState jobState = 
ExportJob.JobState.valueOf(state);
+                if (jobState != ExportJob.JobState.PENDING && jobState != 
ExportJob.JobState.EXPORTING) {
+                    throw new AnalysisException("Only support 
PENDING/EXPORTING, invalid state: " + state);
                 }
             } catch (IllegalArgumentException e) {
-                throw new AnalysisException("invalid state: " + state);
+                throw new AnalysisException("Only support PENDING/EXPORTING, 
invalid state: " + state);
             }
         }
     }
@@ -118,15 +117,18 @@ public class CancelLoadStmt extends DdlStmt {
 
     private void compoundCheck(Expr expr) throws AnalysisException {
         if (expr == null) {
-            throw new AnalysisException("Where clause can't is null");
+            throw new AnalysisException("Where clause can't be null");
         }
         if (expr instanceof CompoundPredicate) {
             // current only support label and state
             CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+            if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
+                throw new AnalysisException("Current not support NOT 
operator");
+            }
             for (int i = 0; i < 2; i++) {
                 Expr child = compoundPredicate.getChild(i);
                 if (child instanceof CompoundPredicate) {
-                    throw new AnalysisException("Current only support label 
and state");
+                    throw new AnalysisException("Current not support nested 
clause");
                 }
                 likeCheck(child);
                 binaryCheck(child);
@@ -147,8 +149,6 @@ public class CancelLoadStmt extends DdlStmt {
             dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
         }
 
-        // check auth after we get real load job
-        // analyze expr
         likeCheck(whereClause);
         binaryCheck(whereClause);
         compoundCheck(whereClause);
@@ -157,7 +157,7 @@ public class CancelLoadStmt extends DdlStmt {
     @Override
     public String toSql() {
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("CANCEL LOAD ");
+        stringBuilder.append("CANCEL EXPORT ");
         if (!Strings.isNullOrEmpty(dbName)) {
             stringBuilder.append("FROM ").append(dbName);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index c08f6370a4..2ca9970677 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -123,6 +123,9 @@ public class CancelLoadStmt extends DdlStmt {
         if (expr instanceof CompoundPredicate) {
             // current only support label and state
             CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+            if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
+                throw new AnalysisException("Current not support NOT 
operator");
+            }
             for (int i = 0; i < 2; i++) {
                 Expr child = compoundPredicate.getChild(i);
                 if (child instanceof CompoundPredicate) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 00057dfd67..7f681548df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -521,7 +521,7 @@ public class ExportJob implements Writable {
         return whereExpr;
     }
 
-    public JobState getState() {
+    public synchronized JobState getState() {
         return state;
     }
 
@@ -651,11 +651,12 @@ public class ExportJob implements Writable {
     }
 
     public synchronized void cancel(ExportFailMsg.CancelType type, String msg) 
{
-        releaseSnapshotPaths();
         if (msg != null) {
             failMsg = new ExportFailMsg(type, msg);
         }
-        updateState(ExportJob.JobState.CANCELLED, false);
+        if (updateState(ExportJob.JobState.CANCELLED, false)) {
+            releaseSnapshotPaths();
+        }
     }
 
     public synchronized boolean updateState(ExportJob.JobState newState) {
@@ -663,6 +664,9 @@ public class ExportJob implements Writable {
     }
 
     public synchronized boolean updateState(ExportJob.JobState newState, 
boolean isReplay) {
+        if (isFinalState()) {
+            return false;
+        }
         state = newState;
         switch (newState) {
             case PENDING:
@@ -686,6 +690,10 @@ public class ExportJob implements Writable {
         return true;
     }
 
+    public synchronized boolean isFinalState() {
+        return this.state == ExportJob.JobState.CANCELLED || this.state == 
ExportJob.JobState.FINISHED;
+    }
+
     public Status releaseSnapshotPaths() {
         List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
         LOG.debug("snapshotPaths:{}", snapshotPaths);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index b332f7ed25..cdee254f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.load;
 
+import org.apache.doris.analysis.CancelExportStmt;
+import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.ExportStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Database;
@@ -24,6 +26,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.PatternMatcher;
@@ -33,10 +36,12 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -48,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class ExportMgr {
@@ -99,11 +105,62 @@ public class ExportMgr {
         LOG.info("add export job. {}", job);
     }
 
+    public void cancelExportJob(CancelExportStmt stmt) throws DdlException, 
AnalysisException {
+        // List of export jobs waiting to be cancelled
+        List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
+        if (matchExportJobs.isEmpty()) {
+            throw new DdlException("Export job(s) do not exist");
+        }
+        matchExportJobs = matchExportJobs.stream()
+                .filter(job -> 
!job.isFinalState()).collect(Collectors.toList());
+        if (matchExportJobs.isEmpty()) {
+            throw new DdlException("All export job(s) are at final state 
(CANCELLED/FINISHED)");
+        }
+        for (ExportJob exportJob : matchExportJobs) {
+            exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user 
cancel");
+        }
+    }
+
     public void unprotectAddJob(ExportJob job) {
         idToJob.put(job.getId(), job);
         labelToJobId.putIfAbsent(job.getLabel(), job.getId());
     }
 
+    private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws 
AnalysisException {
+        Predicate<ExportJob> jobFilter = buildCancelJobFilter(stmt);
+        readLock();
+        try {
+            return 
getJobs().stream().filter(jobFilter).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    @VisibleForTesting
+    public static Predicate<ExportJob> buildCancelJobFilter(CancelExportStmt 
stmt) throws AnalysisException {
+        String label = stmt.getLabel();
+        String state = stmt.getState();
+        PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, 
CaseSensibility.LABEL.getCaseSensibility());
+
+        return job -> {
+            boolean labelFilter = true;
+            boolean stateFilter = true;
+            if (StringUtils.isNotEmpty(label)) {
+                labelFilter = label.contains("%") ? 
matcher.match(job.getLabel()) :
+                        job.getLabel().equalsIgnoreCase(label);
+            }
+            if (StringUtils.isNotEmpty(state)) {
+                stateFilter = job.getState().name().equalsIgnoreCase(state);
+            }
+
+            if (stmt.getOperator() != null && 
CompoundPredicate.Operator.OR.equals(stmt.getOperator())) {
+                return labelFilter || stateFilter;
+            }
+
+            return labelFilter && stateFilter;
+        };
+    }
+
     private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
         ExportJob job = new ExportJob(jobId);
         job.setJob(stmt);
@@ -294,12 +351,12 @@ public class ExportMgr {
     }
 
     public void replayUpdateJobState(long jobId, ExportJob.JobState newState) {
-        writeLock();
+        readLock();
         try {
             ExportJob job = idToJob.get(jobId);
             job.updateState(newState, true);
         } finally {
-            writeUnlock();
+            readUnlock();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index d48f8fdfdf..fa907027aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -48,6 +48,7 @@ import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
+import org.apache.doris.analysis.CancelExportStmt;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CreateCatalogStmt;
@@ -188,6 +189,8 @@ public class DdlExecutor {
             } else {
                 env.getLoadManager().createLoadJobFromStmt(loadStmt);
             }
+        } else if (ddlStmt instanceof CancelExportStmt) {
+            env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
         } else if (ddlStmt instanceof CancelLoadStmt) {
             env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 4f8084a272..8066e280a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -161,18 +161,17 @@ public class ExportExportingTask extends MasterTask {
             }
         }
 
-        // release snapshot
-        Status releaseSnapshotStatus = job.releaseSnapshotPaths();
-        if (!releaseSnapshotStatus.ok()) {
-            // even if release snapshot failed, do nothing cancel this job.
-            // snapshot will be removed by GC thread on BE, finally.
-            LOG.warn("failed to release snapshot for export job: {}. err: {}", 
job.getId(),
-                    releaseSnapshotStatus.getErrorMsg());
-        }
-
         if (job.updateState(ExportJob.JobState.FINISHED)) {
             LOG.warn("export job success. job: {}", job);
             registerProfile();
+            // release snapshot
+            Status releaseSnapshotStatus = job.releaseSnapshotPaths();
+            if (!releaseSnapshotStatus.ok()) {
+                // even if release snapshot failed, do not cancel this job.
+                // snapshot will be removed by GC thread on BE, finally.
+                LOG.warn("failed to release snapshot for export job: {}. err: 
{}", job.getId(),
+                        releaseSnapshotStatus.getErrorMsg());
+            }
         }
 
         synchronized (this) {
@@ -336,12 +335,9 @@ public class ExportExportingTask extends MasterTask {
             }
         }
 
-        if (!failed) {
-            exportedFiles.clear();
-            job.addExportedFiles(newFiles);
-            ClientPool.brokerPool.returnObject(address, client);
-        }
-
+        exportedFiles.clear();
+        job.addExportedFiles(newFiles);
+        ClientPool.brokerPool.returnObject(address, client);
         return Status.OK;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
new file mode 100644
index 0000000000..30be49e031
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportMgr;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.wildfly.common.Assert;
+
+import java.util.List;
+import java.util.function.Predicate;
+
+public class CancelExportStmtTest extends TestWithFeService {
+
+    private Analyzer analyzer;
+    private String dbName = "testDb";
+    private String tblName = "table1";
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        FeConstants.runningUnitTest = true;
+        createDatabase(dbName);
+        useDatabase(dbName);
+        createTable("create table " + tblName + "\n" + "(k1 int, k2 int) 
distributed by hash(k1) buckets 1\n"
+                + "properties(\"replication_num\" = \"1\");");
+        analyzer = new Analyzer(connectContext.getEnv(), connectContext);
+    }
+
+    @Test
+    public void testNormal() throws UserException {
+        SlotRef labelSlotRef = new SlotRef(null, "label");
+        StringLiteral labelStringLiteral = new 
StringLiteral("doris_test_label");
+
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+
+        BinaryPredicate labelBinaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef,
+                labelStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, 
labelBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb 
WHERE `label` = 'doris_test_label'",
+                stmt.toString());
+
+        SlotRef labelSlotRefUpper = new SlotRef(null, "LABEL");
+        BinaryPredicate labelBinaryPredicateUpper = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRefUpper,
+                labelStringLiteral);
+        CancelExportStmt stmtUpper = new CancelExportStmt(null, 
labelBinaryPredicateUpper);
+        stmtUpper.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb 
WHERE `LABEL` = 'doris_test_label'",
+                stmtUpper.toString());
+
+        StringLiteral stateStringLiteral = new StringLiteral("PENDING");
+        BinaryPredicate stateBinaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
+                stateStringLiteral);
+        stmt = new CancelExportStmt(null, stateBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb 
WHERE `state` = 'PENDING'", stmt.toString());
+
+        LikePredicate labelLikePredicate = new 
LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef,
+                labelStringLiteral);
+        stmt = new CancelExportStmt(null, labelLikePredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb 
WHERE `label` LIKE 'doris_test_label'",
+                stmt.toString());
+
+        CompoundPredicate compoundAndPredicate = new 
CompoundPredicate(Operator.AND, labelBinaryPredicate,
+                stateBinaryPredicate);
+        stmt = new CancelExportStmt(null, compoundAndPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals(
+                "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 
'doris_test_label' AND `state` = 'PENDING'",
+                stmt.toString());
+
+        CompoundPredicate compoundOrPredicate = new 
CompoundPredicate(Operator.OR, labelBinaryPredicate,
+                stateBinaryPredicate);
+        stmt = new CancelExportStmt(null, compoundOrPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals(
+                "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 
'doris_test_label' OR `state` = 'PENDING'",
+                stmt.toString());
+    }
+
+    @Test
+    public void testError1() {
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
+
+        LikePredicate stateLikePredicate =
+                new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, 
stateStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, stateLikePredicate);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only 
label can use like",
+                () -> stmt.analyze(analyzer));
+    }
+
+    @Test
+    public void testError2() {
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral1 = new StringLiteral("EXPORTING");
+        BinaryPredicate stateEqPredicate1 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, 
stateStringLiteral1);
+
+        StringLiteral stateStringLiteral2 = new StringLiteral("PENDING");
+        BinaryPredicate stateEqPredicate2 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, 
stateStringLiteral2);
+
+        SlotRef labelSlotRef = new SlotRef(null, "label");
+        StringLiteral labelStringLiteral1 = new StringLiteral("test_label");
+        BinaryPredicate labelEqPredicate1 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, 
labelStringLiteral1);
+
+        CompoundPredicate compoundAndPredicate1 = new 
CompoundPredicate(Operator.AND, stateEqPredicate1,
+                stateEqPredicate2);
+        CompoundPredicate compoundAndPredicate2 = new 
CompoundPredicate(Operator.AND, compoundAndPredicate1,
+                labelEqPredicate1);
+        CompoundPredicate compoundAndPredicate3 = new 
CompoundPredicate(Operator.NOT, stateEqPredicate1, null);
+
+
+        CancelExportStmt stmt1 = new CancelExportStmt(null, 
compoundAndPredicate2);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current 
not support nested clause",
+                () -> stmt1.analyze(analyzer));
+
+
+        CancelExportStmt stmt2 = new CancelExportStmt(null, 
compoundAndPredicate3);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current 
not support NOT operator",
+                () -> stmt2.analyze(analyzer));
+    }
+
+    @Test
+    public void testCancelJobFilter() throws UserException {
+        List<ExportJob> exportJobList1 = Lists.newLinkedList();
+        List<ExportJob> exportJobList2 = Lists.newLinkedList();
+        ExportJob job1 = new ExportJob();
+        ExportJob job2 = new ExportJob();
+        job2.updateState(ExportJob.JobState.CANCELLED, true);
+        ExportJob job3 = new ExportJob();
+        job3.updateState(ExportJob.JobState.EXPORTING, true);
+        ExportJob job4 = new ExportJob();
+        exportJobList1.add(job1);
+        exportJobList1.add(job2);
+        exportJobList1.add(job3);
+        exportJobList1.add(job4);
+        exportJobList2.add(job1);
+        exportJobList2.add(job2);
+
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("PENDING");
+        BinaryPredicate stateEqPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, 
stateStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate);
+        stmt.analyze(analyzer);
+        Predicate<ExportJob> filter = ExportMgr.buildCancelJobFilter(stmt);
+
+        Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 2);
+        Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
+
+        stateStringLiteral = new StringLiteral("EXPORTING");
+        stateEqPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, 
stateStringLiteral);
+        stmt = new CancelExportStmt(null, stateEqPredicate);
+        stmt.analyze(analyzer);
+        filter = ExportMgr.buildCancelJobFilter(stmt);
+
+        Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
+
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to