This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef6bf067cab [refactor](delete) refactor FE DeleteHandler related logic 
(#25497)
ef6bf067cab is described below

commit ef6bf067cab706b82475771665225f13e0b97800
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Oct 20 18:19:56 2023 +0800

    [refactor](delete) refactor FE DeleteHandler related logic (#25497)
---
 .../java/org/apache/doris/catalog/Partition.java   |   8 +
 .../java/org/apache/doris/load/DeleteHandler.java  | 748 ++-------------------
 .../java/org/apache/doris/load/DeleteInfo.java     |   4 -
 .../main/java/org/apache/doris/load/DeleteJob.java | 559 ++++++++++++++-
 .../org/apache/doris/load/DeleteJobLifeCycle.java  |  46 ++
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 -
 .../java/org/apache/doris/qe/StmtExecutor.java     |  10 +-
 .../org/apache/doris/load/DeleteHandlerTest.java   |  61 +-
 8 files changed, 649 insertions(+), 790 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index a970d1798de..3a1905bd8e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -282,6 +282,14 @@ public class Partition extends MetaObject implements 
Writable {
         return replicaCount;
     }
 
+    public long getAllReplicaCount() {
+        long replicaCount = 0;
+        for (MaterializedIndex mIndex : 
getMaterializedIndices(IndexExtState.ALL)) {
+            replicaCount += mIndex.getReplicaCount();
+        }
+        return replicaCount;
+    }
+
     public boolean hasData() {
         // The fe unit test need to check the selected index id without any 
data.
         // So if set FeConstants.runningUnitTest, we can ensure that the 
number of partitions is not empty,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index dbf303f10fb..ea3f786d0fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -17,78 +17,26 @@
 
 package org.apache.doris.load;
 
-import org.apache.doris.analysis.BinaryPredicate;
-import org.apache.doris.analysis.DateLiteral;
 import org.apache.doris.analysis.DeleteStmt;
-import org.apache.doris.analysis.InPredicate;
-import org.apache.doris.analysis.IsNullPredicate;
-import org.apache.doris.analysis.LiteralExpr;
-import org.apache.doris.analysis.Predicate;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
-import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionInfo;
-import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletInvertedIndex;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MarkedCountDownLatch;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.DeleteJob.DeleteState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.planner.ColumnBound;
-import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.ListPartitionPrunerV2;
-import org.apache.doris.planner.PartitionPruner;
-import org.apache.doris.planner.RangePartitionPrunerV2;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.qe.QueryStateException;
-import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.task.AgentBatchTask;
-import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.PushTask;
-import org.apache.doris.thrift.TColumn;
-import org.apache.doris.thrift.TPriority;
-import org.apache.doris.thrift.TPushType;
-import org.apache.doris.thrift.TTaskType;
-import org.apache.doris.transaction.GlobalTransactionMgr;
-import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.qe.QueryState;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
-import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -97,18 +45,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 public class DeleteHandler implements Writable {
     private static final Logger LOG = 
LogManager.getLogger(DeleteHandler.class);
@@ -128,13 +70,6 @@ public class DeleteHandler implements Writable {
         lock = new ReentrantReadWriteLock();
     }
 
-    private enum CancelType {
-        METADATA_MISSING,
-        TIMEOUT,
-        COMMIT_FAIL,
-        UNKNOWN
-    }
-
     public void readLock() {
         lock.readLock().lock();
     }
@@ -151,258 +86,44 @@ public class DeleteHandler implements Writable {
         lock.writeLock().unlock();
     }
 
-    public void process(DeleteStmt stmt) throws DdlException, 
QueryStateException {
-        String dbName = stmt.getDbName();
-        String tableName = stmt.getTableName();
-        List<String> partitionNames = stmt.getPartitionNames();
-        boolean noPartitionSpecified = partitionNames.isEmpty();
-        List<Predicate> conditions = stmt.getDeleteConditions();
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-
+    public void process(DeleteStmt stmt, QueryState execState) throws 
DdlException {
+        Database targetDb = 
Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
+        OlapTable targetTbl = 
targetDb.getOlapTableOrDdlException(stmt.getTableName());
         DeleteJob deleteJob = null;
         try {
-            MarkedCountDownLatch<Long, Long> countDownLatch;
-            long transactionId = -1;
-            OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
-            olapTable.readLock();
+            targetTbl.readLock();
             try {
-                if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
+                if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) {
                     // table under alter operation can also do delete.
                     // just add a comment here to notice.
                 }
-
-                if (noPartitionSpecified) {
-                    // Try to get selected partitions if no partition 
specified in delete statement
-                    // Use PartitionPruner to generate the select partitions
-                    if (olapTable.getPartitionInfo().getType() == 
PartitionType.RANGE
-                            || olapTable.getPartitionInfo().getType() == 
PartitionType.LIST) {
-                        Set<String> partitionColumnNameSet = 
olapTable.getPartitionColumnNames();
-                        Map<String, ColumnRange> columnNameToRange = 
Maps.newHashMap();
-                        for (String colName : partitionColumnNameSet) {
-                            ColumnRange columnRange = 
createColumnRange(olapTable, colName, conditions);
-                            // Not all partition columns are involved in 
predicate conditions
-                            if (columnRange != null) {
-                                columnNameToRange.put(colName, columnRange);
-                            }
-                        }
-
-                        Collection<Long> selectedPartitionId = null;
-                        if (!columnNameToRange.isEmpty()) {
-                            PartitionInfo partitionInfo = 
olapTable.getPartitionInfo();
-                            Map<Long, PartitionItem> keyItemMap = 
partitionInfo.getIdToItem(false);
-                            PartitionPruner pruner = 
olapTable.getPartitionInfo().getType() == PartitionType.RANGE
-                                    ? new RangePartitionPrunerV2(keyItemMap, 
partitionInfo.getPartitionColumns(),
-                                    columnNameToRange)
-                                    : new ListPartitionPrunerV2(keyItemMap, 
partitionInfo.getPartitionColumns(),
-                                            columnNameToRange);
-                            selectedPartitionId = pruner.prune();
-                        }
-                        // selectedPartitionId is empty means no partition 
matches conditions.
-                        // How to return empty set in such case?
-                        if (selectedPartitionId != null && 
!selectedPartitionId.isEmpty()) {
-                            for (long partitionId : selectedPartitionId) {
-                                
partitionNames.add(olapTable.getPartition(partitionId).getName());
-                            }
-                        } else {
-                            if 
(!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
-                                throw new DdlException("This is a range or 
list partitioned table."
-                                        + " You should specify partition in 
delete stmt,"
-                                        + " or set delete_without_partition to 
true");
-                            } else {
-                                
partitionNames.addAll(olapTable.getPartitionNames());
-                            }
-                        }
-                    } else if (olapTable.getPartitionInfo().getType() == 
PartitionType.UNPARTITIONED) {
-                        // this is a unpartitioned table, use table name as 
partition name
-                        partitionNames.add(olapTable.getName());
-                    } else {
-                        throw new DdlException("Unknown partition type: " + 
olapTable.getPartitionInfo().getType());
-                    }
-                }
-
-                Map<Long, Short> partitionReplicaNum = Maps.newHashMap();
-                List<Partition> partitions = Lists.newArrayList();
-                for (String partName : partitionNames) {
-                    Partition partition = olapTable.getPartition(partName);
-                    if (partition == null) {
-                        throw new DdlException("Partition does not exist. 
name: " + partName);
-                    }
-                    partitions.add(partition);
-                    partitionReplicaNum.put(partition.getId(),
-                            
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum());
-                }
-
-                List<String> deleteConditions = Lists.newArrayList();
-
-                // pre check
-                checkDeleteV2(olapTable, partitions, conditions, 
deleteConditions);
-
-                // generate label
-                String label = "delete_" + UUID.randomUUID();
-                //generate jobId
-                long jobId = Env.getCurrentEnv().getNextId();
-                // begin txn here and generate txn id
-                transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
-                        Lists.newArrayList(olapTable.getId()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
-                        TransactionState.LoadJobSourceType.FRONTEND, jobId, 
Config.stream_load_default_timeout_second);
-
-
-                DeleteInfo deleteInfo = new DeleteInfo(db.getId(), 
olapTable.getId(), tableName, deleteConditions);
-                deleteInfo.setPartitions(noPartitionSpecified, 
partitions.stream().map(Partition::getId)
-                        .collect(Collectors.toList()), partitionNames);
-                deleteJob = new DeleteJob(jobId, transactionId, label, 
partitionReplicaNum, deleteInfo);
-                idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob);
-
-                
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob);
+                deleteJob = DeleteJob.newBuilder()
+                        .buildWith(new DeleteJob.BuildParams(
+                                targetDb,
+                                targetTbl,
+                                stmt.getPartitionNames(),
+                                stmt.getDeleteConditions()));
+
+                long txnId = deleteJob.beginTxn();
                 TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr()
-                        .getTransactionState(db.getId(), transactionId);
+                        .getTransactionState(targetDb.getId(), txnId);
                 // must call this to make sure we only handle the tablet in 
the mIndex we saw here.
-                // table may be under schema changge or rollup, and the newly 
created tablets will not be checked later,
+                // table may be under schema change or rollup, and the newly 
created tablets will not be checked later,
                 // to make sure that the delete transaction can be done 
successfully.
-                txnState.addTableIndexes(olapTable);
-
-                // task sent to be
-                AgentBatchTask batchTask = new AgentBatchTask();
-                // count total replica num
-                // Get ALL materialized indexes, because delete condition will 
be applied to all indexes
-                int totalReplicaNum = 0;
-                for (Partition partition : partitions) {
-                    for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
-                        for (Tablet tablet : index.getTablets()) {
-                            totalReplicaNum += tablet.getReplicas().size();
-                        }
-                    }
-                }
-                countDownLatch = new MarkedCountDownLatch<Long, 
Long>(totalReplicaNum);
-
-                for (Partition partition : partitions) {
-                    for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
-                        long indexId = index.getId();
-                        int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
-
-                        List<TColumn> columnsDesc = new ArrayList<TColumn>();
-                        for (Column column : 
olapTable.getSchemaByIndexId(indexId)) {
-                            columnsDesc.add(column.toThrift());
-                        }
-
-                        for (Tablet tablet : index.getTablets()) {
-                            long tabletId = tablet.getId();
-
-                            // set push type
-                            TPushType type = TPushType.DELETE;
-
-                            for (Replica replica : tablet.getReplicas()) {
-                                long replicaId = replica.getId();
-                                long backendId = replica.getBackendId();
-                                countDownLatch.addMark(backendId, tabletId);
-
-                                // create push task for each replica
-                                PushTask pushTask = new PushTask(null,
-                                        replica.getBackendId(), db.getId(), 
olapTable.getId(),
-                                        partition.getId(), indexId,
-                                        tabletId, replicaId, schemaHash,
-                                        -1, "", -1, 0,
-                                        -1, type, conditions,
-                                        true, TPriority.NORMAL,
-                                        TTaskType.REALTIME_PUSH,
-                                        transactionId,
-                                        Env.getCurrentGlobalTransactionMgr()
-                                                
.getTransactionIDGenerator().getNextTransactionId(),
-                                        columnsDesc);
-                                pushTask.setIsSchemaChanging(false);
-                                pushTask.setCountDownLatch(countDownLatch);
-
-                                if (AgentTaskQueue.addTask(pushTask)) {
-                                    batchTask.addTask(pushTask);
-                                    deleteJob.addPushTask(pushTask);
-                                    deleteJob.addTablet(tabletId);
-                                }
-                            }
-                        }
-                    }
-                }
-
-                // submit push tasks
-                if (batchTask.getTaskNum() > 0) {
-                    AgentTaskExecutor.submit(batchTask);
-                }
-
-            } catch (Throwable t) {
-                LOG.warn("error occurred during delete process", t);
-                // if transaction has been begun, need to abort it
-                if 
(Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), 
transactionId) != null) {
-                    cancelJob(deleteJob, CancelType.UNKNOWN, t.getMessage());
-                }
-                throw new DdlException(t.getMessage(), t);
+                txnState.addTableIndexes(targetTbl);
+                idToDeleteJob.put(txnId, deleteJob);
+                deleteJob.dispatch();
             } finally {
-                olapTable.readUnlock();
+                targetTbl.readUnlock();
             }
-
-            long timeoutMs = deleteJob.getTimeoutMs();
-            LOG.info("waiting delete Job finish, signature: {}, timeout: {}", 
transactionId, timeoutMs);
-            boolean ok = false;
-            try {
-                ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                LOG.warn("InterruptedException: ", e);
-                ok = false;
-            }
-
-            if (!ok) {
-                String errMsg = "";
-                List<Entry<Long, Long>> unfinishedMarks = 
countDownLatch.getLeftMarks();
-                // only show at most 5 results
-                List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, 
Math.min(unfinishedMarks.size(), 5));
-                if (!subList.isEmpty()) {
-                    errMsg = "unfinished replicas [BackendId=TabletId]: " + 
Joiner.on(", ").join(subList);
-                }
-                LOG.warn(errMsg);
-
-                try {
-                    deleteJob.checkAndUpdateQuorum();
-                } catch (MetaNotFoundException e) {
-                    cancelJob(deleteJob, CancelType.METADATA_MISSING, 
e.getMessage());
-                    throw new DdlException(e.getMessage(), e);
-                }
-                DeleteState state = deleteJob.getState();
-                switch (state) {
-                    case UN_QUORUM:
-                        LOG.warn("delete job timeout: transactionId {}, 
timeout {}, {}",
-                                transactionId, timeoutMs, errMsg);
-                        cancelJob(deleteJob, CancelType.TIMEOUT, "delete job 
timeout");
-                        throw new DdlException("failed to execute delete. 
transaction id " + transactionId
-                                + ", timeout(ms) " + timeoutMs + ", " + 
errMsg);
-                    case QUORUM_FINISHED:
-                    case FINISHED:
-                        try {
-                            long nowQuorumTimeMs = System.currentTimeMillis();
-                            long endQuorumTimeoutMs = nowQuorumTimeMs + 
timeoutMs / 2;
-                            // if job's state is quorum_finished then wait for 
a period of time and commit it.
-                            while (deleteJob.getState() == 
DeleteState.QUORUM_FINISHED
-                                    && endQuorumTimeoutMs > nowQuorumTimeMs) {
-                                deleteJob.checkAndUpdateQuorum();
-                                Thread.sleep(1000);
-                                nowQuorumTimeMs = System.currentTimeMillis();
-                                LOG.debug("wait for quorum finished delete 
job: {}, txn id: {}",
-                                        deleteJob.getId(), transactionId);
-                            }
-                        } catch (MetaNotFoundException e) {
-                            cancelJob(deleteJob, CancelType.METADATA_MISSING, 
e.getMessage());
-                            throw new DdlException(e.getMessage(), e);
-                        } catch (InterruptedException e) {
-                            cancelJob(deleteJob, CancelType.UNKNOWN, 
e.getMessage());
-                            throw new DdlException(e.getMessage(), e);
-                        }
-                        commitJob(deleteJob, db, olapTable, timeoutMs);
-                        break;
-                    default:
-                        Preconditions.checkState(false, "wrong delete job 
state: " + state.name());
-                        break;
-                }
-            } else {
-                commitJob(deleteJob, db, olapTable, timeoutMs);
+            deleteJob.await();
+            String commitMsg = deleteJob.commit();
+            execState.setOk(0, 0, commitMsg);
+        } catch (Exception ex) {
+            if (deleteJob != null) {
+                deleteJob.cancel(ex.getMessage());
             }
+            execState.setError(ex.getMessage());
         } finally {
             if (!FeConstants.runningUnitTest) {
                 clearJob(deleteJob);
@@ -410,176 +131,21 @@ public class DeleteHandler implements Writable {
         }
     }
 
-    // Return null if there is no filter for the partition column
-    private ColumnRange createColumnRange(OlapTable table, String colName, 
List<Predicate> conditions)
-            throws AnalysisException {
-        ColumnRange result = ColumnRange.create();
-        Type type =
-                table.getBaseSchema().stream().filter(c -> 
c.getName().equalsIgnoreCase(colName))
-                        .findFirst().get().getType();
-
-        boolean hasRange = false;
-        for (Predicate predicate : conditions) {
-            List<Range<ColumnBound>> bounds = createColumnRange(colName, 
predicate, type);
-            if (bounds != null) {
-                hasRange = true;
-                result.intersect(bounds);
-            }
-        }
-        if (hasRange) {
-            return result;
-        } else {
-            return null;
-        }
-    }
-
-    // Return null if the condition is not related to the partition column,
-    // or the operator is not supported.
-    private List<Range<ColumnBound>> createColumnRange(String colName, 
Predicate condition, Type type)
-            throws AnalysisException {
-        List<Range<ColumnBound>> result = Lists.newLinkedList();
-        if (condition instanceof BinaryPredicate) {
-            BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-            if (!(binaryPredicate.getChild(0) instanceof SlotRef)) {
-                return null;
-            }
-            String columnName = ((SlotRef) 
binaryPredicate.getChild(0)).getColumnName();
-            if (!colName.equalsIgnoreCase(columnName)) {
-                return null;
-            }
-            ColumnBound bound = ColumnBound.of(
-                    LiteralExpr.create(((LiteralExpr) 
binaryPredicate.getChild(1)).getStringValue(), type));
-            switch (binaryPredicate.getOp()) {
-                case EQ:
-                    result.add(Range.closed(bound, bound));
-                    break;
-                case GE:
-                    result.add(Range.atLeast(bound));
-                    break;
-                case GT:
-                    result.add(Range.greaterThan(bound));
-                    break;
-                case LT:
-                    result.add(Range.lessThan(bound));
-                    break;
-                case LE:
-                    result.add(Range.atMost(bound));
-                    break;
-                case NE:
-                    result.add(Range.lessThan(bound));
-                    result.add(Range.greaterThan(bound));
-                    break;
-                default:
-                    return null;
-            }
-        } else if (condition instanceof InPredicate) {
-            InPredicate inPredicate = (InPredicate) condition;
-            if (!(inPredicate.getChild(0) instanceof SlotRef)) {
-                return null;
-            }
-            String columnName = ((SlotRef) 
inPredicate.getChild(0)).getColumnName();
-            if (!colName.equals(columnName)) {
-                return null;
-            }
-            if (inPredicate.isNotIn()) {
-                return null;
-            }
-            for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
-                ColumnBound bound = ColumnBound.of(LiteralExpr
-                        .create(((LiteralExpr) 
inPredicate.getChild(i)).getStringValue(), type));
-                result.add(Range.closed(bound, bound));
-            }
-        } else {
-            return null;
-        }
-        return result;
-    }
-
-    private void commitJob(DeleteJob job, Database db, Table table, long 
timeoutMs)
-            throws DdlException, QueryStateException {
-        TransactionStatus status = TransactionStatus.UNKNOWN;
-        try {
-            boolean isVisible = unprotectedCommitJob(job, db, table, 
timeoutMs);
-            status = isVisible ? TransactionStatus.VISIBLE : 
TransactionStatus.COMMITTED;
-        } catch (UserException e) {
-            if (cancelJob(job, CancelType.COMMIT_FAIL, e.getMessage())) {
-                throw new DdlException(e.getMessage(), e);
-            }
-        }
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("{'label':'").append(job.getLabel()).append("', 
'status':'").append(status.name());
-        sb.append("', 'txnId':'").append(job.getTransactionId()).append("'");
-
-        switch (status) {
-            case COMMITTED: {
-                // Although publish is unfinished we should tell user that 
commit already success.
-                String errMsg = "delete job is committed but may be taking 
effect later";
-                sb.append(", 'err':'").append(errMsg).append("'");
-                sb.append("}");
-                throw new QueryStateException(MysqlStateType.OK, 
sb.toString());
-            }
-            case VISIBLE: {
-                sb.append("}");
-                throw new QueryStateException(MysqlStateType.OK, 
sb.toString());
-            }
-            default:
-                Preconditions.checkState(false, "wrong transaction status: " + 
status.name());
-                break;
-        }
-    }
-
-    /**
-     * unprotected commit delete job
-     * return true when successfully commit and publish
-     * return false when successfully commit but publish unfinished.
-     * A UserException thrown if both commit and publish failed.
-     * @param job
-     * @param db
-     * @param timeoutMs
-     * @return
-     * @throws UserException
-     */
-    private boolean unprotectedCommitJob(DeleteJob job, Database db, Table 
table, long timeoutMs) throws UserException {
-        long transactionId = job.getTransactionId();
-        GlobalTransactionMgr globalTransactionMgr = 
Env.getCurrentGlobalTransactionMgr();
-        List<TabletCommitInfo> tabletCommitInfos = new 
ArrayList<TabletCommitInfo>();
-        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
-        for (TabletDeleteInfo tDeleteInfo : job.getTabletDeleteInfo()) {
-            for (Replica replica : tDeleteInfo.getFinishedReplicas()) {
-                // the inverted index contains rolling up replica
-                Long tabletId = 
invertedIndex.getTabletIdByReplica(replica.getId());
-                if (tabletId == null) {
-                    LOG.warn("could not find tablet id for replica {}, the 
tablet maybe dropped", replica);
-                    continue;
-                }
-                tabletCommitInfos.add(new TabletCommitInfo(tabletId, 
replica.getBackendId()));
-            }
-        }
-        return globalTransactionMgr.commitAndPublishTransaction(db, 
Lists.newArrayList(table),
-                transactionId, tabletCommitInfos, timeoutMs);
-    }
-
     /**
      * This method should always be called in the end of the delete process to 
clean the job.
      * Better put it in finally block.
+     *
      * @param job
      */
     private void clearJob(DeleteJob job) {
-        if (job != null) {
-            long signature = job.getTransactionId();
-            if (idToDeleteJob.containsKey(signature)) {
-                idToDeleteJob.remove(signature);
-            }
-            for (PushTask pushTask : job.getPushTasks()) {
-                AgentTaskQueue.removePushTask(pushTask.getBackendId(), 
pushTask.getSignature(),
-                        pushTask.getVersion(),
-                        pushTask.getPushType(), pushTask.getTaskType());
-            }
-
-            // NOT remove callback from GlobalTransactionMgr's callback 
factory here.
-            // the callback will be removed after transaction is aborted of 
visible.
+        if (job == null) {
+            return;
         }
+        long signature = job.getTransactionId();
+        idToDeleteJob.remove(signature);
+        job.cleanUp();
+        // do not remove callback from GlobalTransactionMgr's callback factory 
here.
+        // the callback will be removed after transaction is aborted or 
visible.
     }
 
     public void recordFinishedJob(DeleteJob job) {
@@ -598,250 +164,13 @@ public class DeleteHandler implements Writable {
         }
     }
 
-    /**
-     * abort delete job
-     * return true when successfully abort.
-     * return true when some unknown error happened, just ignore it.
-     * return false when the job is already committed
-     * @param job
-     * @param cancelType
-     * @param reason
-     * @return
-     */
-    public boolean cancelJob(DeleteJob job, CancelType cancelType, String 
reason) {
-        LOG.info("start to cancel delete job, transactionId: {}, cancelType: 
{}",
-                job.getTransactionId(), cancelType.name());
-        GlobalTransactionMgr globalTransactionMgr = 
Env.getCurrentGlobalTransactionMgr();
-        try {
-            if (job != null) {
-                
globalTransactionMgr.abortTransaction(job.getDeleteInfo().getDbId(), 
job.getTransactionId(), reason);
-            }
-        } catch (Exception e) {
-            TransactionState state = globalTransactionMgr.getTransactionState(
-                    job.getDeleteInfo().getDbId(), job.getTransactionId());
-            if (state == null) {
-                LOG.warn("cancel delete job failed because txn not found, 
transactionId: {}",
-                        job.getTransactionId());
-            } else if (state.getTransactionStatus() == 
TransactionStatus.COMMITTED
-                    || state.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                LOG.warn("cancel delete job failed because it has been 
committed, transactionId: {}",
-                        job.getTransactionId());
-                return false;
-            } else {
-                LOG.warn("errors while abort transaction", e);
-            }
-        }
-        return true;
-    }
-
     public DeleteJob getDeleteJob(long transactionId) {
         return idToDeleteJob.get(transactionId);
     }
 
-    private SlotRef getSlotRef(Predicate condition) {
-        SlotRef slotRef = null;
-        if (condition instanceof BinaryPredicate) {
-            BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-            slotRef = (SlotRef) binaryPredicate.getChild(0);
-        } else if (condition instanceof IsNullPredicate) {
-            IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
-            slotRef = (SlotRef) isNullPredicate.getChild(0);
-        } else if (condition instanceof InPredicate) {
-            InPredicate inPredicate = (InPredicate) condition;
-            slotRef = (SlotRef) inPredicate.getChild(0);
-        }
-        return slotRef;
-    }
-
-    private void checkDeleteV2(OlapTable table, List<Partition> partitions,
-            List<Predicate> conditions, List<String> deleteConditions)
-            throws DdlException {
-        // check condition column is key column and condition value
-        // Here we use "getFullSchema()" to get all columns including VISIBLE 
and SHADOW columns
-        Map<String, Column> nameToColumn = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-        for (Column column : table.getFullSchema()) {
-            nameToColumn.put(column.getName(), column);
-        }
-
-        for (Predicate condition : conditions) {
-            SlotRef slotRef = getSlotRef(condition);
-            String columnName = slotRef.getColumnName();
-            if (!nameToColumn.containsKey(columnName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, 
columnName, table.getName());
-            }
-
-            if (Column.isShadowColumn(columnName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                        "Can not apply delete condition to shadow column");
-            }
-
-            // Check if this column is under schema change, if yes, there will 
be a shadow column related to it.
-            // And we don't allow doing delete operation when a condition 
column is under schema change.
-            String shadowColName = Column.getShadowName(columnName);
-            if (nameToColumn.containsKey(shadowColName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
"Column " + columnName + " is under"
-                        + " schema change operation. Do not allow delete 
operation");
-            }
-
-            Column column = nameToColumn.get(columnName);
-            // Due to rounding errors, most floating-point numbers end up 
being slightly imprecise,
-            // it also means that numbers expected to be equal often differ 
slightly, so we do not allow compare with
-            // floating-point numbers, floating-point number not allowed in 
where clause
-            if (column.getDataType().isFloatingPointType()) {
-                throw new DdlException("Column[" + columnName + "] type is 
float or double.");
-            }
-            if (!column.isKey()) {
-                if (table.getKeysType() == KeysType.AGG_KEYS) {
-                    throw new DdlException("delete predicate on value column 
only supports Unique table with"
-                            + " merge-on-write enabled and Duplicate table, 
but " + "Table[" + table.getName()
-                                    + "] is an Aggregate table.");
-                } else if (table.getKeysType() == KeysType.UNIQUE_KEYS && 
!table.getEnableUniqueKeyMergeOnWrite()) {
-                    throw new DdlException("delete predicate on value column 
only supports Unique table with"
-                            + " merge-on-write enabled and Duplicate table, 
but " + "Table[" + table.getName()
-                                    + "] is an Aggregate table.");
-                }
-            }
-
-            if (condition instanceof BinaryPredicate) {
-                String value = null;
-                try {
-                    BinaryPredicate binaryPredicate = (BinaryPredicate) 
condition;
-                    // if a bool cond passed to be, be's zone_map cannot 
handle bool correctly,
-                    // change it to a tinyint type here;
-                    value = ((LiteralExpr) 
binaryPredicate.getChild(1)).getStringValue();
-                    if (column.getDataType() == PrimitiveType.BOOLEAN) {
-                        if (value.toLowerCase().equals("true")) {
-                            binaryPredicate.setChild(1, 
LiteralExpr.create("1", Type.TINYINT));
-                        } else if (value.toLowerCase().equals("false")) {
-                            binaryPredicate.setChild(1, 
LiteralExpr.create("0", Type.TINYINT));
-                        }
-                    } else if (column.getDataType() == PrimitiveType.DATE
-                            || column.getDataType() == PrimitiveType.DATETIME
-                            || column.getDataType() == PrimitiveType.DATEV2) {
-                        DateLiteral dateLiteral = new DateLiteral(value, 
Type.fromPrimitiveType(column.getDataType()));
-                        value = dateLiteral.getStringValue();
-                        binaryPredicate.setChild(1, LiteralExpr.create(value,
-                                Type.fromPrimitiveType(column.getDataType())));
-                    } else if (column.getDataType() == 
PrimitiveType.DATETIMEV2) {
-                        DateLiteral dateLiteral = new DateLiteral(value,
-                                
ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE));
-                        value = dateLiteral.getStringValue();
-                        binaryPredicate.setChild(1, LiteralExpr.create(value,
-                                
ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE)));
-                    }
-                    LiteralExpr.create(value, column.getType());
-                } catch (AnalysisException e) {
-                    // 
ErrorReport.reportDdlException(ErrorCode.ERR_INVALID_VALUE, value);
-                    throw new DdlException("Invalid column value[" + value + 
"] for column " + columnName);
-                }
-            } else if (condition instanceof InPredicate) {
-                String value = null;
-                try {
-                    InPredicate inPredicate = (InPredicate) condition;
-                    for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
-                        value = inPredicate.getChild(i).getStringValue();
-                        if (column.getDataType() == PrimitiveType.DATE
-                                || column.getDataType() == 
PrimitiveType.DATETIME
-                                || column.getDataType() == PrimitiveType.DATEV2
-                                || column.getDataType() == 
PrimitiveType.DATETIMEV2) {
-                            DateLiteral dateLiteral = new DateLiteral(value,
-                                    column.getType());
-                            value = dateLiteral.getStringValue();
-                            inPredicate.setChild(i, LiteralExpr.create(value,
-                                    column.getType()));
-                        } else {
-                            LiteralExpr.create(value,
-                                    
Type.fromPrimitiveType(column.getDataType()));
-                        }
-                    }
-                } catch (AnalysisException e) {
-                    throw new DdlException("Invalid column value[" + value + 
"] for column " + columnName);
-                }
-            }
-
-            // set schema column name
-            slotRef.setCol(column.getName());
-        }
-
-        // check materialized index.
-        // only need to check the first partition, because each partition has 
same materialized views
-        Map<Long, List<Column>> indexIdToSchema = table.getIndexIdToSchema();
-        Partition partition = partitions.get(0);
-        // Here we check ALL materialized views instead of just VISIBLE ones.
-        // For example, when a table is doing rollup or schema change. there 
will be some SHADOW indexes.
-        // And we also need to check these SHADOW indexes to see if the delete 
condition can be applied to them.
-        for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
-            if (table.getBaseIndexId() == index.getId()) {
-                continue;
-            }
-
-            // check table has condition column
-            Map<String, Column> indexColNameToColumn = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-            for (Column column : indexIdToSchema.get(index.getId())) {
-                indexColNameToColumn.put(column.getName(), column);
-            }
-            String indexName = table.getIndexNameById(index.getId());
-            for (Predicate condition : conditions) {
-                SlotRef slotRef = getSlotRef(condition);
-                String columnName = slotRef.getColumnName();
-                Column column = indexColNameToColumn.get(columnName);
-                if (column == null) {
-                    
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR,
-                            columnName, "index[" + indexName + "]");
-                }
-                MaterializedIndexMeta indexMeta = 
table.getIndexIdToMeta().get(index.getId());
-                if (indexMeta.getKeysType() != KeysType.DUP_KEYS && 
!column.isKey()) {
-                    throw new DdlException("Column[" + columnName + "] is not 
key column in index[" + indexName + "]");
-                }
-            }
-        }
-
-        if (deleteConditions == null) {
-            return;
-        }
-
-        // save delete conditions
-        for (Predicate condition : conditions) {
-            if (condition instanceof BinaryPredicate) {
-                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-                SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                StringBuilder sb = new StringBuilder();
-                sb.append(columnName).append(" 
").append(binaryPredicate.getOp().name()).append(" \"")
-                        
.append(binaryPredicate.getChild(1).getStringValue()).append("\"");
-                deleteConditions.add(sb.toString());
-            } else if (condition instanceof IsNullPredicate) {
-                IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
-                SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                StringBuilder sb = new StringBuilder();
-                sb.append(columnName);
-                if (isNullPredicate.isNotNull()) {
-                    sb.append(" IS NOT NULL");
-                } else {
-                    sb.append(" IS NULL");
-                }
-                deleteConditions.add(sb.toString());
-            } else if (condition instanceof InPredicate) {
-                InPredicate inPredicate = (InPredicate) condition;
-                SlotRef slotRef = (SlotRef) inPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                StringBuilder strBuilder = new StringBuilder();
-                String notStr = inPredicate.isNotIn() ? "NOT " : "";
-                strBuilder.append(columnName).append(" 
").append(notStr).append("IN (");
-                for (int i = 1; i <= inPredicate.getInElementNum(); ++i) {
-                    strBuilder.append(inPredicate.getChild(i).toSql());
-                    strBuilder.append((i != inPredicate.getInElementNum()) ? 
", " : "");
-                }
-                strBuilder.append(")");
-                deleteConditions.add(strBuilder.toString());
-            }
-        }
-    }
-
     // show delete stmt
     public List<List<Comparable>> getDeleteInfosByDb(long dbId) {
-        LinkedList<List<Comparable>> infos = new 
LinkedList<List<Comparable>>();
+        LinkedList<List<Comparable>> infos = new LinkedList<>();
         Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (db == null) {
             return infos;
@@ -896,8 +225,8 @@ public class DeleteHandler implements Writable {
         }
 
         // sort by createTimeMs
-        ListComparator<List<Comparable>> comparator = new 
ListComparator<List<Comparable>>(2);
-        Collections.sort(infos, comparator);
+        ListComparator<List<Comparable>> comparator = new ListComparator<>(2);
+        infos.sort(comparator);
         return infos;
     }
 
@@ -919,7 +248,6 @@ public class DeleteHandler implements Writable {
     @Override
     public void write(DataOutput out) throws IOException {
         removeOldDeleteInfos();
-        Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
 
     public static DeleteHandler read(DataInput in) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
index 300c67e58d5..05c6c4b1a86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
@@ -60,10 +60,6 @@ public class DeleteInfo implements Writable, 
GsonPostProcessable {
     @SerializedName(value = "partitionName")
     private String partitionName;
 
-    public DeleteInfo() {
-        this.deleteConditions = Lists.newArrayList();
-    }
-
     public DeleteInfo(long dbId, long tableId, String tableName, List<String> 
deleteConditions) {
         this.dbId = dbId;
         this.tableId = tableId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 3723c226c5a..eaa4395093d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -17,29 +17,74 @@
 
 package org.apache.doris.load;
 
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.PartitionPruner;
+import org.apache.doris.planner.RangePartitionPrunerV2;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TColumn;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-public class DeleteJob extends AbstractTxnStateChangeCallback {
+public class DeleteJob extends AbstractTxnStateChangeCallback implements 
DeleteJobLifeCycle {
     private static final Logger LOG = LogManager.getLogger(DeleteJob.class);
 
+    public static final String DELETE_PREFIX = "delete_";
+
     public enum DeleteState {
         UN_QUORUM,
         QUORUM_FINISHED,
@@ -49,18 +94,28 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
     private DeleteState state;
 
     // jobId(listenerId). use in beginTransaction to callback function
-    private long id;
+    private final long id;
     // transaction id.
     private long signature;
-    private String label;
-    private Set<Long> totalTablets;
-    private Set<Long> quorumTablets;
-    private Set<Long> finishedTablets;
+    private final String label;
+    private final Set<Long> totalTablets;
+    private final Set<Long> quorumTablets;
+    private final Set<Long> finishedTablets;
     Map<Long, TabletDeleteInfo> tabletDeleteInfoMap;
-    private Set<PushTask> pushTasks;
-    private DeleteInfo deleteInfo;
+    private final Set<PushTask> pushTasks;
+    private final DeleteInfo deleteInfo;
+
+    private final  Map<Long, Short> partitionReplicaNum;
+
+    private Database targetDb;
+
+    private OlapTable targetTbl;
+
+    private List<Partition> partitions;
+
+    private List<Predicate> deleteConditions;
 
-    private Map<Long, Short> partitionReplicaNum;
+    private MarkedCountDownLatch<Long, Long> countDownLatch;
 
     public DeleteJob(long id, long transactionId, String label,
                      Map<Long, Short> partitionReplicaNum, DeleteInfo 
deleteInfo) {
@@ -77,13 +132,17 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
         this.partitionReplicaNum = partitionReplicaNum;
     }
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
     /**
      * check and update if this job's state is QUORUM_FINISHED or FINISHED
      * The meaning of state:
      * QUORUM_FINISHED: For each tablet there are more than half of its 
replicas have been finished
-     * FINISHED: All replicas of this jobs have finished
+     * FINISHED: All replicas of this job have finished
      */
-    public void checkAndUpdateQuorum() throws MetaNotFoundException {
+    private void checkAndUpdateQuorum() throws MetaNotFoundException {
         long dbId = deleteInfo.getDbId();
         Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
 
@@ -122,32 +181,28 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
                 signature, totalTablets.size(), quorumTablets.size(), 
dropCounter);
 
         if (finishedTablets.containsAll(totalTablets)) {
-            setState(DeleteState.FINISHED);
+            this.state = DeleteState.FINISHED;
         } else if (quorumTablets.containsAll(totalTablets)) {
-            setState(DeleteState.QUORUM_FINISHED);
+            this.state = DeleteState.QUORUM_FINISHED;
         }
     }
 
-    public void setState(DeleteState state) {
-        this.state = state;
-    }
-
     public DeleteState getState() {
         return this.state;
     }
 
-    public boolean addTablet(long tabletId) {
-        return totalTablets.add(tabletId);
+    private void addTablet(long tabletId) {
+        totalTablets.add(tabletId);
     }
 
-    public boolean addPushTask(PushTask pushTask) {
-        return pushTasks.add(pushTask);
+    public void addPushTask(PushTask pushTask) {
+        pushTasks.add(pushTask);
     }
 
-    public boolean addFinishedReplica(long partitionId, long tabletId, Replica 
replica) {
+    public void addFinishedReplica(long partitionId, long tabletId, Replica 
replica) {
         tabletDeleteInfoMap.putIfAbsent(tabletId, new 
TabletDeleteInfo(partitionId, tabletId));
-        TabletDeleteInfo tDeleteInfo =  tabletDeleteInfoMap.get(tabletId);
-        return tDeleteInfo.addFinishedReplica(replica);
+        TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId);
+        tDeleteInfo.addFinishedReplica(replica);
     }
 
     public DeleteInfo getDeleteInfo() {
@@ -158,10 +213,6 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
         return this.label;
     }
 
-    public Set<PushTask> getPushTasks() {
-        return pushTasks;
-    }
-
     @Override
     public long getId() {
         return this.id;
@@ -177,14 +228,13 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
     }
 
     @Override
-    public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason)
-            throws UserException {
+    public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason) {
         // just to clean the callback
         
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
     }
 
     public void executeFinish() {
-        setState(DeleteState.FINISHED);
+        this.state = DeleteState.FINISHED;
         Env.getCurrentEnv().getDeleteHandler().recordFinishedJob(this);
         
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
     }
@@ -206,4 +256,451 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
         long timeout = Math.max(totalTablets.size() * 
Config.tablet_delete_timeout_second * 1000L, 30000L);
         return Math.min(timeout, Config.delete_job_max_timeout_second * 1000L);
     }
+
+    public void setTargetDb(Database targetDb) {
+        this.targetDb = targetDb;
+    }
+
+    public void setTargetTbl(OlapTable targetTbl) {
+        this.targetTbl = targetTbl;
+    }
+
+    public void setPartitions(List<Partition> partitions) {
+        this.partitions = partitions;
+    }
+
+    public void setDeleteConditions(List<Predicate> deleteConditions) {
+        this.deleteConditions = deleteConditions;
+    }
+
+    public void setCountDownLatch(MarkedCountDownLatch<Long, Long> 
countDownLatch) {
+        this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public long beginTxn() throws Exception {
+        long txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
+                Lists.newArrayList(deleteInfo.getTableId()), label, null,
+                new TransactionState.TxnCoordinator(
+                        TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                TransactionState.LoadJobSourceType.FRONTEND, id, 
Config.stream_load_default_timeout_second);
+        this.signature = txnId;
+        
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+        return txnId;
+    }
+
+    @Override
+    public void dispatch() throws Exception {
+        // task sent to be
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Partition partition : partitions) {
+            for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
+                long indexId = index.getId();
+                int schemaHash = targetTbl.getSchemaHashByIndexId(indexId);
+
+                List<TColumn> columnsDesc = Lists.newArrayList();
+                for (Column column : targetTbl.getSchemaByIndexId(indexId)) {
+                    columnsDesc.add(column.toThrift());
+                }
+
+                for (Tablet tablet : index.getTablets()) {
+                    long tabletId = tablet.getId();
+
+                    // set push type
+                    TPushType type = TPushType.DELETE;
+
+                    for (Replica replica : tablet.getReplicas()) {
+                        long replicaId = replica.getId();
+                        long backendId = replica.getBackendId();
+                        countDownLatch.addMark(backendId, tabletId);
+
+                        // create push task for each replica
+                        PushTask pushTask = new PushTask(null,
+                                replica.getBackendId(), targetDb.getId(), 
targetTbl.getId(),
+                                partition.getId(), indexId,
+                                tabletId, replicaId, schemaHash,
+                                -1, "", -1, 0,
+                                -1, type, deleteConditions,
+                                true, TPriority.NORMAL,
+                                TTaskType.REALTIME_PUSH,
+                                signature,
+                                Env.getCurrentGlobalTransactionMgr()
+                                        
.getTransactionIDGenerator().getNextTransactionId(),
+                                columnsDesc);
+                        pushTask.setIsSchemaChanging(false);
+                        pushTask.setCountDownLatch(countDownLatch);
+
+                        if (AgentTaskQueue.addTask(pushTask)) {
+                            batchTask.addTask(pushTask);
+                            addPushTask(pushTask);
+                            addTablet(tabletId);
+                        }
+                    }
+                }
+            }
+        }
+
+        // submit push tasks
+        if (batchTask.getTaskNum() > 0) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
+    @Override
+    public void await() throws Exception {
+        long timeoutMs = getTimeoutMs();
+        boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
+        if (ok) {
+            return;
+        }
+
+        //handle failure
+        String errMsg = "";
+        List<Map.Entry<Long, Long>> unfinishedMarks = 
countDownLatch.getLeftMarks();
+        // only show at most 5 results
+        List<Map.Entry<Long, Long>> subList = unfinishedMarks.subList(0, 
Math.min(unfinishedMarks.size(), 5));
+        if (!subList.isEmpty()) {
+            errMsg = "unfinished replicas [BackendId=TabletId]: " + 
Joiner.on(", ").join(subList);
+        }
+        LOG.warn(errMsg);
+        checkAndUpdateQuorum();
+        switch (state) {
+            case UN_QUORUM:
+                LOG.warn("delete job timeout: transactionId {}, timeout {}, 
{}",
+                        signature, timeoutMs, errMsg);
+                throw new UserException(String.format("delete job timeout, 
timeout(ms):%s, msg:%s", timeoutMs, errMsg));
+            case QUORUM_FINISHED:
+            case FINISHED:
+                long nowQuorumTimeMs = System.currentTimeMillis();
+                long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
+                // if job's state is quorum_finished then wait for a period of 
time and commit it.
+                while (state == DeleteState.QUORUM_FINISHED
+                        && endQuorumTimeoutMs > nowQuorumTimeMs) {
+                    checkAndUpdateQuorum();
+                    Thread.sleep(1000);
+                    nowQuorumTimeMs = System.currentTimeMillis();
+                    LOG.debug("wait for quorum finished delete job: {}, txn 
id: {}",
+                            id, signature);
+                }
+                break;
+            default:
+                throw new IllegalStateException("wrong delete job state: " + 
state.name());
+        }
+    }
+
+    @Override
+    public String commit() throws Exception {
+        TabletInvertedIndex currentInvertedIndex = 
Env.getCurrentInvertedIndex();
+        List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
+        tabletDeleteInfoMap.forEach((tabletId, deleteInfo) -> 
deleteInfo.getFinishedReplicas()
+                .forEach(replica -> {
+                    if 
(currentInvertedIndex.getTabletIdByReplica(replica.getId()) == null) {
+                        LOG.warn("could not find tablet id for replica {}, the 
tablet maybe dropped", replica);
+                        return;
+                    }
+                    tabletCommitInfos.add(new TabletCommitInfo(tabletId, 
replica.getBackendId()));
+                }));
+        boolean visible = Env.getCurrentGlobalTransactionMgr()
+                .commitAndPublishTransaction(targetDb, 
Lists.newArrayList(targetTbl),
+                        signature, tabletCommitInfos, getTimeoutMs());
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(label);
+        sb.append("', 'txnId':'").append(signature)
+                .append("', 'status':'");
+        if (visible) {
+            sb.append(TransactionStatus.VISIBLE.name()).append("'");
+            sb.append("}");
+        } else {
+            // Although publish is unfinished we should tell user that commit 
already success.
+            sb.append(TransactionStatus.COMMITTED.name()).append("'");
+            String msg = "delete job is committed but may be taking effect 
later";
+            sb.append(", 'msg':'").append(msg).append("'");
+            sb.append("}");
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public void cancel(String reason) {
+        GlobalTransactionMgr globalTransactionMgr = 
Env.getCurrentGlobalTransactionMgr();
+        try {
+            globalTransactionMgr.abortTransaction(deleteInfo.getDbId(), 
signature, reason);
+        } catch (Exception e) {
+            TransactionState state = globalTransactionMgr.getTransactionState(
+                    deleteInfo.getDbId(), signature);
+            if (state == null) {
+                LOG.warn("cancel delete job failed because txn not found, 
transactionId: {}",
+                        signature);
+            } else if (state.getTransactionStatus() == 
TransactionStatus.COMMITTED
+                    || state.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+                LOG.warn("cancel delete job failed because it has been 
committed, transactionId: {}",
+                        signature);
+            } else {
+                LOG.warn("errors while abort transaction", e);
+            }
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+        for (PushTask pushTask : pushTasks) {
+            AgentTaskQueue.removePushTask(pushTask.getBackendId(), 
pushTask.getSignature(),
+                    pushTask.getVersion(),
+                    pushTask.getPushType(), pushTask.getTaskType());
+        }
+    }
+
+    public static class BuildParams {
+
+        private final Database db;
+        private final OlapTable table;
+
+        private final Collection<String> partitionNames;
+
+        private final List<Predicate> deleteConditions;
+
+        public BuildParams(Database db, OlapTable table,
+                           Collection<String> partitionNames,
+                           List<Predicate> deleteConditions) {
+            this.db = db;
+            this.table = table;
+            this.partitionNames = partitionNames;
+            this.deleteConditions = deleteConditions;
+        }
+
+        public OlapTable getTable() {
+            return table;
+        }
+
+        public Collection<String> getPartitionNames() {
+            return partitionNames;
+        }
+
+        public Database getDb() {
+            return db;
+        }
+
+        public List<Predicate> getDeleteConditions() {
+            return deleteConditions;
+        }
+    }
+
+    public static class Builder {
+
+        public DeleteJob buildWith(BuildParams params) throws Exception {
+            List<Partition> partitions = 
getSelectedPartitions(params.getTable(),
+                    params.getPartitionNames(), params.getDeleteConditions());
+            Map<Long, Short> partitionReplicaNum = partitions.stream()
+                    .collect(Collectors.toMap(
+                            Partition::getId,
+                            partition ->
+                                    params.getTable()
+                                            .getPartitionInfo()
+                                            
.getReplicaAllocation(partition.getId())
+                                            .getTotalReplicaNum()));
+            // generate label
+            String label = DELETE_PREFIX + UUID.randomUUID();
+            //generate jobId
+            long jobId = Env.getCurrentEnv().getNextId();
+            DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), 
params.getTable().getId(),
+                    params.getTable().getName(), 
getDeleteCondString(params.getDeleteConditions()));
+            DeleteJob deleteJob = new DeleteJob(jobId, -1, label, 
partitionReplicaNum, deleteInfo);
+            long replicaNum = 
partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
+            deleteJob.setPartitions(partitions);
+            deleteJob.setDeleteConditions(params.getDeleteConditions());
+            deleteJob.setTargetDb(params.getDb());
+            deleteJob.setTargetTbl(params.getTable());
+            deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) 
replicaNum));
+            return deleteJob;
+        }
+
+        private List<Partition> getSelectedPartitions(OlapTable olapTable, 
Collection<String> partitionNames,
+                                                      List<Predicate> 
deleteConditions) throws Exception {
+            if (partitionNames.isEmpty()) {
+                // Try to get selected partitions if no partition specified in 
delete statement
+                // Use PartitionPruner to generate the select partitions
+                if (olapTable.getPartitionInfo().getType() == 
PartitionType.RANGE
+                        || olapTable.getPartitionInfo().getType() == 
PartitionType.LIST) {
+                    Set<String> partitionColumnNameSet = 
olapTable.getPartitionColumnNames();
+                    Map<String, ColumnRange> columnNameToRange = 
Maps.newHashMap();
+                    for (String colName : partitionColumnNameSet) {
+                        ColumnRange columnRange = createColumnRange(olapTable, 
colName, deleteConditions);
+                        // Not all partition columns are involved in predicate 
conditions
+                        if (columnRange != null) {
+                            columnNameToRange.put(colName, columnRange);
+                        }
+                    }
+
+                    Collection<Long> selectedPartitionId = null;
+                    if (!columnNameToRange.isEmpty()) {
+                        PartitionInfo partitionInfo = 
olapTable.getPartitionInfo();
+                        Map<Long, PartitionItem> keyItemMap = 
partitionInfo.getIdToItem(false);
+                        PartitionPruner pruner = 
olapTable.getPartitionInfo().getType() == PartitionType.RANGE
+                                ? new RangePartitionPrunerV2(keyItemMap, 
partitionInfo.getPartitionColumns(),
+                                columnNameToRange)
+                                : new ListPartitionPrunerV2(keyItemMap, 
partitionInfo.getPartitionColumns(),
+                                columnNameToRange);
+                        selectedPartitionId = pruner.prune();
+                    }
+                    // selectedPartitionId is empty means no partition matches 
conditions.
+                    // How to return empty set in such case?
+                    if (selectedPartitionId != null && 
!selectedPartitionId.isEmpty()) {
+                        for (long partitionId : selectedPartitionId) {
+                            
partitionNames.add(olapTable.getPartition(partitionId).getName());
+                        }
+                    } else {
+                        if 
(!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
+                            throw new UserException("This is a range or list 
partitioned table."
+                                    + " You should specify partition in delete 
stmt,"
+                                    + " or set delete_without_partition to 
true");
+                        } else {
+                            
partitionNames.addAll(olapTable.getPartitionNames());
+                        }
+                    }
+                } else if (olapTable.getPartitionInfo().getType() == 
PartitionType.UNPARTITIONED) {
+                    // this is an un-partitioned table, use table name as 
partition name
+                    partitionNames.add(olapTable.getName());
+                } else {
+                    throw new UserException("Unknown partition type: " + 
olapTable.getPartitionInfo().getType());
+                }
+            }
+            List<Partition> partitions = Lists.newArrayList();
+            for (String partName : partitionNames) {
+                Partition partition = olapTable.getPartition(partName);
+                if (partition == null) {
+                    throw new DdlException("Partition does not exist. name: " 
+ partName);
+                }
+                partitions.add(partition);
+            }
+            return partitions;
+        }
+
+        // Return null if there is no filter for the partition column
+        private ColumnRange createColumnRange(OlapTable table, String colName, 
List<Predicate> conditions)
+                throws AnalysisException {
+
+            ColumnRange result = ColumnRange.create();
+            Type type =
+                    table.getBaseSchema().stream().filter(c -> 
c.getName().equalsIgnoreCase(colName))
+                            .findFirst().get().getType();
+
+            boolean hasRange = false;
+            for (Predicate predicate : conditions) {
+                List<Range<ColumnBound>> bounds = createColumnRange(colName, 
predicate, type);
+                if (bounds != null) {
+                    hasRange = true;
+                    result.intersect(bounds);
+                }
+            }
+            if (hasRange) {
+                return result;
+            } else {
+                return null;
+            }
+        }
+
+        // Return null if the condition is not related to the partition column,
+        // or the operator is not supported.
+        private List<Range<ColumnBound>> createColumnRange(String colName, 
Predicate condition, Type type)
+                throws AnalysisException {
+            List<Range<ColumnBound>> result = Lists.newLinkedList();
+            if (condition instanceof BinaryPredicate) {
+                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+                if (!(binaryPredicate.getChild(0) instanceof SlotRef)) {
+                    return null;
+                }
+                String columnName = ((SlotRef) 
binaryPredicate.getChild(0)).getColumnName();
+                if (!colName.equalsIgnoreCase(columnName)) {
+                    return null;
+                }
+                ColumnBound bound = ColumnBound.of(
+                        
LiteralExpr.create(binaryPredicate.getChild(1).getStringValue(), type));
+                switch (binaryPredicate.getOp()) {
+                    case EQ:
+                        result.add(Range.closed(bound, bound));
+                        break;
+                    case GE:
+                        result.add(Range.atLeast(bound));
+                        break;
+                    case GT:
+                        result.add(Range.greaterThan(bound));
+                        break;
+                    case LT:
+                        result.add(Range.lessThan(bound));
+                        break;
+                    case LE:
+                        result.add(Range.atMost(bound));
+                        break;
+                    case NE:
+                        result.add(Range.lessThan(bound));
+                        result.add(Range.greaterThan(bound));
+                        break;
+                    default:
+                        return null;
+                }
+            } else if (condition instanceof InPredicate) {
+                InPredicate inPredicate = (InPredicate) condition;
+                if (!(inPredicate.getChild(0) instanceof SlotRef)) {
+                    return null;
+                }
+                String columnName = ((SlotRef) 
inPredicate.getChild(0)).getColumnName();
+                if (!colName.equals(columnName)) {
+                    return null;
+                }
+                if (inPredicate.isNotIn()) {
+                    return null;
+                }
+                for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
+                    ColumnBound bound = ColumnBound.of(LiteralExpr
+                            .create(inPredicate.getChild(i).getStringValue(), 
type));
+                    result.add(Range.closed(bound, bound));
+                }
+            } else {
+                return null;
+            }
+            return result;
+        }
+
+        private List<String> getDeleteCondString(List<Predicate> conditions) {
+            List<String> deleteConditions = 
Lists.newArrayListWithCapacity(conditions.size());
+            // save delete conditions
+            for (Predicate condition : conditions) {
+                if (condition instanceof BinaryPredicate) {
+                    BinaryPredicate binaryPredicate = (BinaryPredicate) 
condition;
+                    SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
+                    String columnName = slotRef.getColumnName();
+                    String sb = columnName + " " + 
binaryPredicate.getOp().name() + " \""
+                            + binaryPredicate.getChild(1).getStringValue() + 
"\"";
+                    deleteConditions.add(sb);
+                } else if (condition instanceof IsNullPredicate) {
+                    IsNullPredicate isNullPredicate = (IsNullPredicate) 
condition;
+                    SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
+                    String columnName = slotRef.getColumnName();
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(columnName);
+                    if (isNullPredicate.isNotNull()) {
+                        sb.append(" IS NOT NULL");
+                    } else {
+                        sb.append(" IS NULL");
+                    }
+                    deleteConditions.add(sb.toString());
+                } else if (condition instanceof InPredicate) {
+                    InPredicate inPredicate = (InPredicate) condition;
+                    SlotRef slotRef = (SlotRef) inPredicate.getChild(0);
+                    String columnName = slotRef.getColumnName();
+                    StringBuilder strBuilder = new StringBuilder();
+                    String notStr = inPredicate.isNotIn() ? "NOT " : "";
+                    strBuilder.append(columnName).append(" 
").append(notStr).append("IN (");
+                    for (int i = 1; i <= inPredicate.getInElementNum(); ++i) {
+                        strBuilder.append(inPredicate.getChild(i).toSql());
+                        strBuilder.append((i != inPredicate.getInElementNum()) 
? ", " : "");
+                    }
+                    strBuilder.append(")");
+                    deleteConditions.add(strBuilder.toString());
+                }
+            }
+            return deleteConditions;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java
new file mode 100644
index 00000000000..c12dd84169e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+public interface DeleteJobLifeCycle {
+
+    /**
+     * @return txn id
+     */
+    long beginTxn() throws Exception;
+
+    /**
+     * dispatch push tasks in an async way
+     */
+    void dispatch() throws Exception;
+
+    /**
+     * called after dispatch, waiting for quorum to be finished
+     */
+    void await() throws Exception;
+
+    /**
+     * commit job
+     * @return commit msg
+     */
+    String commit() throws Exception;
+
+    void cancel(String reason) throws Exception;
+
+    void cleanUp() throws Exception;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 8e62b436b86..b09f531c076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -78,7 +78,6 @@ import org.apache.doris.analysis.CreateUserStmt;
 import org.apache.doris.analysis.CreateViewStmt;
 import org.apache.doris.analysis.CreateWorkloadGroupStmt;
 import org.apache.doris.analysis.DdlStmt;
-import org.apache.doris.analysis.DeleteStmt;
 import org.apache.doris.analysis.DropAnalyzeJobStmt;
 import org.apache.doris.analysis.DropCatalogStmt;
 import org.apache.doris.analysis.DropDbStmt;
@@ -200,8 +199,6 @@ public class DdlExecutor {
         } else if (ddlStmt instanceof ResumeJobStmt) {
             ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
             env.getJobRegister().resumeJob(stmt.getDbFullName(), 
stmt.getName(), JobCategory.SQL);
-        } else if (ddlStmt instanceof DeleteStmt) {
-            env.getDeleteHandler().process((DeleteStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateUserStmt) {
             CreateUserStmt stmt = (CreateUserStmt) ddlStmt;
             env.getAuth().createUser(stmt);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 2ce0de22cb2..360353f2c79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -766,8 +766,14 @@ public class StmtExecutor {
             } else if (parsedStmt instanceof UpdateStmt) {
                 handleUpdateStmt();
             } else if (parsedStmt instanceof DdlStmt) {
-                if (parsedStmt instanceof DeleteStmt && ((DeleteStmt) 
parsedStmt).getInsertStmt() != null) {
-                    handleDeleteStmt();
+                if (parsedStmt instanceof DeleteStmt) {
+                    if (((DeleteStmt) parsedStmt).getInsertStmt() != null) {
+                        handleDeleteStmt();
+                    } else {
+                        Env.getCurrentEnv()
+                                .getDeleteHandler()
+                                .process((DeleteStmt) parsedStmt, 
context.getState());
+                    }
                 } else {
                     handleDdlStmt();
                 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 5c62ef75a44..18f29ac30d2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -43,6 +43,7 @@ import 
org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.QueryStateException;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
@@ -213,8 +214,8 @@ public class DeleteHandlerTest {
         };
     }
 
-    @Test(expected = DdlException.class)
-    public void testUnQuorumTimeout() throws DdlException, QueryStateException 
{
+    @Test
+    public void testUnQuorumTimeout() throws DdlException {
         BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
                 new IntLiteral(3));
 
@@ -231,8 +232,9 @@ public class DeleteHandlerTest {
                 minTimes = 0;
             }
         };
-        deleteHandler.process(deleteStmt);
-        Assert.fail();
+        QueryState state = connectContext.getState();
+        deleteHandler.process(deleteStmt, state);
+        Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.ERR);
     }
 
     @Test
@@ -265,11 +267,9 @@ public class DeleteHandlerTest {
             }
         };
 
-        try {
-            deleteHandler.process(deleteStmt);
-        } catch (QueryStateException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
+        QueryState state = connectContext.getState();
+        deleteHandler.process(deleteStmt, state);
+        Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
 
         Map<Long, DeleteJob> idToDeleteJob = 
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
         Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -310,11 +310,9 @@ public class DeleteHandlerTest {
             }
         };
 
-        try {
-            deleteHandler.process(deleteStmt);
-        } catch (QueryStateException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
+        QueryState state = connectContext.getState();
+        deleteHandler.process(deleteStmt, state);
+        Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
 
         Map<Long, DeleteJob> idToDeleteJob = 
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
         Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -324,7 +322,7 @@ public class DeleteHandlerTest {
         }
     }
 
-    @Test(expected = DdlException.class)
+    @Test
     public void testCommitFail(@Mocked MarkedCountDownLatch countDownLatch) 
throws DdlException, QueryStateException {
         BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
                 new IntLiteral(3));
@@ -368,20 +366,8 @@ public class DeleteHandlerTest {
             }
         };
 
-        try {
-            deleteHandler.process(deleteStmt);
-        } catch (DdlException e) {
-            Map<Long, DeleteJob> idToDeleteJob = 
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
-            Collection<DeleteJob> jobs = idToDeleteJob.values();
-            Assert.assertEquals(1, jobs.size());
-            for (DeleteJob job : jobs) {
-                Assert.assertEquals(job.getState(), DeleteState.FINISHED);
-            }
-            throw e;
-        } catch (QueryStateException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
-        Assert.fail();
+        deleteHandler.process(deleteStmt, connectContext.getState());
+        Assert.assertSame(connectContext.getState().getStateType(), 
QueryState.MysqlStateType.ERR);
     }
 
     @Test
@@ -423,12 +409,9 @@ public class DeleteHandlerTest {
                 minTimes = 0;
             }
         };
-
-        try {
-            deleteHandler.process(deleteStmt);
-        } catch (QueryStateException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
+        QueryState state = connectContext.getState();
+        deleteHandler.process(deleteStmt, state);
+        Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
 
         Map<Long, DeleteJob> idToDeleteJob = 
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
         Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -471,11 +454,9 @@ public class DeleteHandlerTest {
             }
         };
 
-        try {
-            deleteHandler.process(deleteStmt);
-        } catch (QueryStateException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
+        QueryState state = connectContext.getState();
+        deleteHandler.process(deleteStmt, state);
+        Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
 
         Map<Long, DeleteJob> idToDeleteJob = 
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
         Collection<DeleteJob> jobs = idToDeleteJob.values();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to