This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 33376b71f00ecca0a1797324c054d0ff32a44c31 Author: wxy <dut.xian...@gmail.com> AuthorDate: Wed Jan 4 14:08:25 2023 +0800 [Feature](export) Support cancel export statement (#15128) Co-authored-by: wangxian...@360shuke.com <wangxian...@360shuke.com> --- fe/fe-core/src/main/cup/sql_parser.cup | 4 + .../{CancelLoadStmt.java => CancelExportStmt.java} | 40 ++--- .../org/apache/doris/analysis/CancelLoadStmt.java | 3 + .../main/java/org/apache/doris/load/ExportJob.java | 14 +- .../main/java/org/apache/doris/load/ExportMgr.java | 61 ++++++- .../main/java/org/apache/doris/qe/DdlExecutor.java | 3 + .../org/apache/doris/task/ExportExportingTask.java | 26 ++- .../doris/analysis/CancelExportStmtTest.java | 190 +++++++++++++++++++++ 8 files changed, 301 insertions(+), 40 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 84853c31c9..83d105c734 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -3965,6 +3965,10 @@ cancel_param ::= {: RESULT = new CancelLoadStmt(db, parser.where); :} + | KW_EXPORT opt_db:db opt_wild_where + {: + RESULT = new CancelExportStmt(db, parser.where); + :} | KW_ALTER KW_TABLE opt_alter_type:type KW_FROM table_name:table cancel_rollup_job_id_list:list {: RESULT = new CancelAlterTableStmt(type, table, list); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java similarity index 78% copy from fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java copy to fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java index c08f6370a4..71998ba226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java @@ -21,7 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.load.loadv2.JobState; +import org.apache.doris.load.ExportJob; import com.google.common.base.Strings; import com.google.common.collect.Sets; @@ -31,11 +31,11 @@ import java.util.Set; /** - * CANCEL LOAD statement used to cancel load job. + * CANCEL EXPORT statement used to cancel export job. * syntax: - * CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx") + * CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"] **/ -public class CancelLoadStmt extends DdlStmt { +public class CancelExportStmt extends DdlStmt { private static final Set<String> SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); @@ -53,7 +53,7 @@ public class CancelLoadStmt extends DdlStmt { private Expr whereClause; - public CancelLoadStmt(String dbName, Expr whereClause) { + public CancelExportStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; this.SUPPORT_COLUMNS.add("label"); @@ -63,34 +63,33 @@ public class CancelLoadStmt extends DdlStmt { private void checkColumn(Expr expr, boolean like) throws AnalysisException { String inputCol = ((SlotRef) expr.getChild(0)).getColumnName(); if (!SUPPORT_COLUMNS.contains(inputCol)) { - throw new AnalysisException("Current not support " + inputCol); + throw new AnalysisException("Current only support label and state, invalid column: " + inputCol); } if (!(expr.getChild(1) instanceof StringLiteral)) { - throw new AnalysisException("Value must is string"); + throw new AnalysisException("Value must be a string"); } String inputValue = expr.getChild(1).getStringValue(); if (Strings.isNullOrEmpty(inputValue)) { - throw new AnalysisException("Value can't is null"); - } - if (like && !inputValue.contains("%")) { - inputValue = "%" + inputValue + "%"; + throw new AnalysisException("Value can't be null"); } + if (inputCol.equalsIgnoreCase("label")) { label = inputValue; } + if (inputCol.equalsIgnoreCase("state")) { if (like) { throw new AnalysisException("Only label can use like"); } state = inputValue; try { - JobState jobState = JobState.valueOf(state); - if (jobState != JobState.PENDING && jobState != JobState.ETL && jobState != JobState.LOADING) { - throw new AnalysisException("invalid state: " + state); + ExportJob.JobState jobState = ExportJob.JobState.valueOf(state); + if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) { + throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); } } catch (IllegalArgumentException e) { - throw new AnalysisException("invalid state: " + state); + throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); } } } @@ -118,15 +117,18 @@ public class CancelLoadStmt extends DdlStmt { private void compoundCheck(Expr expr) throws AnalysisException { if (expr == null) { - throw new AnalysisException("Where clause can't is null"); + throw new AnalysisException("Where clause can't be null"); } if (expr instanceof CompoundPredicate) { // current only support label and state CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { + throw new AnalysisException("Current not support NOT operator"); + } for (int i = 0; i < 2; i++) { Expr child = compoundPredicate.getChild(i); if (child instanceof CompoundPredicate) { - throw new AnalysisException("Current only support label and state"); + throw new AnalysisException("Current not support nested clause"); } likeCheck(child); binaryCheck(child); @@ -147,8 +149,6 @@ public class CancelLoadStmt extends DdlStmt { dbName = ClusterNamespace.getFullName(getClusterName(), dbName); } - // check auth after we get real load job - // analyze expr likeCheck(whereClause); binaryCheck(whereClause); compoundCheck(whereClause); @@ -157,7 +157,7 @@ public class CancelLoadStmt extends DdlStmt { @Override public String toSql() { StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("CANCEL LOAD "); + stringBuilder.append("CANCEL EXPORT "); if (!Strings.isNullOrEmpty(dbName)) { stringBuilder.append("FROM ").append(dbName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index c08f6370a4..2ca9970677 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -123,6 +123,9 @@ public class CancelLoadStmt extends DdlStmt { if (expr instanceof CompoundPredicate) { // current only support label and state CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { + throw new AnalysisException("Current not support NOT operator"); + } for (int i = 0; i < 2; i++) { Expr child = compoundPredicate.getChild(i); if (child instanceof CompoundPredicate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 00057dfd67..7f681548df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -521,7 +521,7 @@ public class ExportJob implements Writable { return whereExpr; } - public JobState getState() { + public synchronized JobState getState() { return state; } @@ -651,11 +651,12 @@ public class ExportJob implements Writable { } public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { - releaseSnapshotPaths(); if (msg != null) { failMsg = new ExportFailMsg(type, msg); } - updateState(ExportJob.JobState.CANCELLED, false); + if (updateState(ExportJob.JobState.CANCELLED, false)) { + releaseSnapshotPaths(); + } } public synchronized boolean updateState(ExportJob.JobState newState) { @@ -663,6 +664,9 @@ public class ExportJob implements Writable { } public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) { + if (isFinalState()) { + return false; + } state = newState; switch (newState) { case PENDING: @@ -686,6 +690,10 @@ public class ExportJob implements Writable { return true; } + public synchronized boolean isFinalState() { + return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED; + } + public Status releaseSnapshotPaths() { List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths(); LOG.debug("snapshotPaths:{}", snapshotPaths); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index b332f7ed25..cdee254f8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -17,6 +17,8 @@ package org.apache.doris.load; +import org.apache.doris.analysis.CancelExportStmt; +import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; @@ -24,6 +26,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; @@ -33,10 +36,12 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; public class ExportMgr { @@ -99,11 +105,62 @@ public class ExportMgr { LOG.info("add export job. {}", job); } + public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { + // List of export jobs waiting to be cancelled + List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt); + if (matchExportJobs.isEmpty()) { + throw new DdlException("Export job(s) do not exist"); + } + matchExportJobs = matchExportJobs.stream() + .filter(job -> !job.isFinalState()).collect(Collectors.toList()); + if (matchExportJobs.isEmpty()) { + throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)"); + } + for (ExportJob exportJob : matchExportJobs) { + exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); + } + } + public void unprotectAddJob(ExportJob job) { idToJob.put(job.getId(), job); labelToJobId.putIfAbsent(job.getLabel(), job.getId()); } + private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException { + Predicate<ExportJob> jobFilter = buildCancelJobFilter(stmt); + readLock(); + try { + return getJobs().stream().filter(jobFilter).collect(Collectors.toList()); + } finally { + readUnlock(); + } + } + + @VisibleForTesting + public static Predicate<ExportJob> buildCancelJobFilter(CancelExportStmt stmt) throws AnalysisException { + String label = stmt.getLabel(); + String state = stmt.getState(); + PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); + + return job -> { + boolean labelFilter = true; + boolean stateFilter = true; + if (StringUtils.isNotEmpty(label)) { + labelFilter = label.contains("%") ? matcher.match(job.getLabel()) : + job.getLabel().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + stateFilter = job.getState().name().equalsIgnoreCase(state); + } + + if (stmt.getOperator() != null && CompoundPredicate.Operator.OR.equals(stmt.getOperator())) { + return labelFilter || stateFilter; + } + + return labelFilter && stateFilter; + }; + } + private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception { ExportJob job = new ExportJob(jobId); job.setJob(stmt); @@ -294,12 +351,12 @@ public class ExportMgr { } public void replayUpdateJobState(long jobId, ExportJob.JobState newState) { - writeLock(); + readLock(); try { ExportJob job = idToJob.get(jobId); job.updateState(newState, true); } finally { - writeUnlock(); + readUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index d48f8fdfdf..fa907027aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -48,6 +48,7 @@ import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelBackupStmt; +import org.apache.doris.analysis.CancelExportStmt; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CleanLabelStmt; import org.apache.doris.analysis.CreateCatalogStmt; @@ -188,6 +189,8 @@ public class DdlExecutor { } else { env.getLoadManager().createLoadJobFromStmt(loadStmt); } + } else if (ddlStmt instanceof CancelExportStmt) { + env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt); } else if (ddlStmt instanceof CancelLoadStmt) { env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); } else if (ddlStmt instanceof CreateRoutineLoadStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 4f8084a272..8066e280a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -161,18 +161,17 @@ public class ExportExportingTask extends MasterTask { } } - // release snapshot - Status releaseSnapshotStatus = job.releaseSnapshotPaths(); - if (!releaseSnapshotStatus.ok()) { - // even if release snapshot failed, do nothing cancel this job. - // snapshot will be removed by GC thread on BE, finally. - LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), - releaseSnapshotStatus.getErrorMsg()); - } - if (job.updateState(ExportJob.JobState.FINISHED)) { LOG.warn("export job success. job: {}", job); registerProfile(); + // release snapshot + Status releaseSnapshotStatus = job.releaseSnapshotPaths(); + if (!releaseSnapshotStatus.ok()) { + // even if release snapshot failed, do not cancel this job. + // snapshot will be removed by GC thread on BE, finally. + LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), + releaseSnapshotStatus.getErrorMsg()); + } } synchronized (this) { @@ -336,12 +335,9 @@ public class ExportExportingTask extends MasterTask { } } - if (!failed) { - exportedFiles.clear(); - job.addExportedFiles(newFiles); - ClientPool.brokerPool.returnObject(address, client); - } - + exportedFiles.clear(); + job.addExportedFiles(newFiles); + ClientPool.brokerPool.returnObject(address, client); return Status.OK; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java new file mode 100644 index 0000000000..30be49e031 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportMgr; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.wildfly.common.Assert; + +import java.util.List; +import java.util.function.Predicate; + +public class CancelExportStmtTest extends TestWithFeService { + + private Analyzer analyzer; + private String dbName = "testDb"; + private String tblName = "table1"; + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase(dbName); + useDatabase(dbName); + createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + analyzer = new Analyzer(connectContext.getEnv(), connectContext); + } + + @Test + public void testNormal() throws UserException { + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral = new StringLiteral("doris_test_label"); + + SlotRef stateSlotRef = new SlotRef(null, "state"); + + BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, + labelStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, labelBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label'", + stmt.toString()); + + SlotRef labelSlotRefUpper = new SlotRef(null, "LABEL"); + BinaryPredicate labelBinaryPredicateUpper = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRefUpper, + labelStringLiteral); + CancelExportStmt stmtUpper = new CancelExportStmt(null, labelBinaryPredicateUpper); + stmtUpper.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `LABEL` = 'doris_test_label'", + stmtUpper.toString()); + + StringLiteral stateStringLiteral = new StringLiteral("PENDING"); + BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, + stateStringLiteral); + stmt = new CancelExportStmt(null, stateBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `state` = 'PENDING'", stmt.toString()); + + LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, + labelStringLiteral); + stmt = new CancelExportStmt(null, labelLikePredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'", + stmt.toString()); + + CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate, + stateBinaryPredicate); + stmt = new CancelExportStmt(null, compoundAndPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals( + "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'PENDING'", + stmt.toString()); + + CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate, + stateBinaryPredicate); + stmt = new CancelExportStmt(null, compoundOrPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals( + "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'PENDING'", + stmt.toString()); + } + + @Test + public void testError1() { + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("FINISHED"); + + LikePredicate stateLikePredicate = + new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, stateLikePredicate); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like", + () -> stmt.analyze(analyzer)); + } + + @Test + public void testError2() { + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral1 = new StringLiteral("EXPORTING"); + BinaryPredicate stateEqPredicate1 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral1); + + StringLiteral stateStringLiteral2 = new StringLiteral("PENDING"); + BinaryPredicate stateEqPredicate2 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral2); + + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral1 = new StringLiteral("test_label"); + BinaryPredicate labelEqPredicate1 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral1); + + CompoundPredicate compoundAndPredicate1 = new CompoundPredicate(Operator.AND, stateEqPredicate1, + stateEqPredicate2); + CompoundPredicate compoundAndPredicate2 = new CompoundPredicate(Operator.AND, compoundAndPredicate1, + labelEqPredicate1); + CompoundPredicate compoundAndPredicate3 = new CompoundPredicate(Operator.NOT, stateEqPredicate1, null); + + + CancelExportStmt stmt1 = new CancelExportStmt(null, compoundAndPredicate2); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support nested clause", + () -> stmt1.analyze(analyzer)); + + + CancelExportStmt stmt2 = new CancelExportStmt(null, compoundAndPredicate3); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support NOT operator", + () -> stmt2.analyze(analyzer)); + } + + @Test + public void testCancelJobFilter() throws UserException { + List<ExportJob> exportJobList1 = Lists.newLinkedList(); + List<ExportJob> exportJobList2 = Lists.newLinkedList(); + ExportJob job1 = new ExportJob(); + ExportJob job2 = new ExportJob(); + job2.updateState(ExportJob.JobState.CANCELLED, true); + ExportJob job3 = new ExportJob(); + job3.updateState(ExportJob.JobState.EXPORTING, true); + ExportJob job4 = new ExportJob(); + exportJobList1.add(job1); + exportJobList1.add(job2); + exportJobList1.add(job3); + exportJobList1.add(job4); + exportJobList2.add(job1); + exportJobList2.add(job2); + + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("PENDING"); + BinaryPredicate stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + Predicate<ExportJob> filter = ExportMgr.buildCancelJobFilter(stmt); + + Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 2); + Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1); + + stateStringLiteral = new StringLiteral("EXPORTING"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + filter = ExportMgr.buildCancelJobFilter(stmt); + + Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1); + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org