This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef6bf067cab [refactor](delete) refactor FE DeleteHandler related logic
(#25497)
ef6bf067cab is described below
commit ef6bf067cab706b82475771665225f13e0b97800
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Oct 20 18:19:56 2023 +0800
[refactor](delete) refactor FE DeleteHandler related logic (#25497)
---
.../java/org/apache/doris/catalog/Partition.java | 8 +
.../java/org/apache/doris/load/DeleteHandler.java | 748 ++-------------------
.../java/org/apache/doris/load/DeleteInfo.java | 4 -
.../main/java/org/apache/doris/load/DeleteJob.java | 559 ++++++++++++++-
.../org/apache/doris/load/DeleteJobLifeCycle.java | 46 ++
.../main/java/org/apache/doris/qe/DdlExecutor.java | 3 -
.../java/org/apache/doris/qe/StmtExecutor.java | 10 +-
.../org/apache/doris/load/DeleteHandlerTest.java | 61 +-
8 files changed, 649 insertions(+), 790 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index a970d1798de..3a1905bd8e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -282,6 +282,14 @@ public class Partition extends MetaObject implements
Writable {
return replicaCount;
}
+ public long getAllReplicaCount() {
+ long replicaCount = 0;
+ for (MaterializedIndex mIndex :
getMaterializedIndices(IndexExtState.ALL)) {
+ replicaCount += mIndex.getReplicaCount();
+ }
+ return replicaCount;
+ }
+
public boolean hasData() {
// The fe unit test need to check the selected index id without any
data.
// So if set FeConstants.runningUnitTest, we can ensure that the
number of partitions is not empty,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index dbf303f10fb..ea3f786d0fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -17,78 +17,26 @@
package org.apache.doris.load;
-import org.apache.doris.analysis.BinaryPredicate;
-import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DeleteStmt;
-import org.apache.doris.analysis.InPredicate;
-import org.apache.doris.analysis.IsNullPredicate;
-import org.apache.doris.analysis.LiteralExpr;
-import org.apache.doris.analysis.Predicate;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
-import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionInfo;
-import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletInvertedIndex;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MarkedCountDownLatch;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.DeleteJob.DeleteState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.planner.ColumnBound;
-import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.ListPartitionPrunerV2;
-import org.apache.doris.planner.PartitionPruner;
-import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.qe.QueryStateException;
-import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.task.AgentBatchTask;
-import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.PushTask;
-import org.apache.doris.thrift.TColumn;
-import org.apache.doris.thrift.TPriority;
-import org.apache.doris.thrift.TPushType;
-import org.apache.doris.thrift.TTaskType;
-import org.apache.doris.transaction.GlobalTransactionMgr;
-import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.qe.QueryState;
import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
-import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -97,18 +45,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
public class DeleteHandler implements Writable {
private static final Logger LOG =
LogManager.getLogger(DeleteHandler.class);
@@ -128,13 +70,6 @@ public class DeleteHandler implements Writable {
lock = new ReentrantReadWriteLock();
}
- private enum CancelType {
- METADATA_MISSING,
- TIMEOUT,
- COMMIT_FAIL,
- UNKNOWN
- }
-
public void readLock() {
lock.readLock().lock();
}
@@ -151,258 +86,44 @@ public class DeleteHandler implements Writable {
lock.writeLock().unlock();
}
- public void process(DeleteStmt stmt) throws DdlException,
QueryStateException {
- String dbName = stmt.getDbName();
- String tableName = stmt.getTableName();
- List<String> partitionNames = stmt.getPartitionNames();
- boolean noPartitionSpecified = partitionNames.isEmpty();
- List<Predicate> conditions = stmt.getDeleteConditions();
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-
+ public void process(DeleteStmt stmt, QueryState execState) throws
DdlException {
+ Database targetDb =
Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
+ OlapTable targetTbl =
targetDb.getOlapTableOrDdlException(stmt.getTableName());
DeleteJob deleteJob = null;
try {
- MarkedCountDownLatch<Long, Long> countDownLatch;
- long transactionId = -1;
- OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
- olapTable.readLock();
+ targetTbl.readLock();
try {
- if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
+ if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) {
// table under alter operation can also do delete.
// just add a comment here to notice.
}
-
- if (noPartitionSpecified) {
- // Try to get selected partitions if no partition
specified in delete statement
- // Use PartitionPruner to generate the select partitions
- if (olapTable.getPartitionInfo().getType() ==
PartitionType.RANGE
- || olapTable.getPartitionInfo().getType() ==
PartitionType.LIST) {
- Set<String> partitionColumnNameSet =
olapTable.getPartitionColumnNames();
- Map<String, ColumnRange> columnNameToRange =
Maps.newHashMap();
- for (String colName : partitionColumnNameSet) {
- ColumnRange columnRange =
createColumnRange(olapTable, colName, conditions);
- // Not all partition columns are involved in
predicate conditions
- if (columnRange != null) {
- columnNameToRange.put(colName, columnRange);
- }
- }
-
- Collection<Long> selectedPartitionId = null;
- if (!columnNameToRange.isEmpty()) {
- PartitionInfo partitionInfo =
olapTable.getPartitionInfo();
- Map<Long, PartitionItem> keyItemMap =
partitionInfo.getIdToItem(false);
- PartitionPruner pruner =
olapTable.getPartitionInfo().getType() == PartitionType.RANGE
- ? new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(),
- columnNameToRange)
- : new ListPartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(),
- columnNameToRange);
- selectedPartitionId = pruner.prune();
- }
- // selectedPartitionId is empty means no partition
matches conditions.
- // How to return empty set in such case?
- if (selectedPartitionId != null &&
!selectedPartitionId.isEmpty()) {
- for (long partitionId : selectedPartitionId) {
-
partitionNames.add(olapTable.getPartition(partitionId).getName());
- }
- } else {
- if
(!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
- throw new DdlException("This is a range or
list partitioned table."
- + " You should specify partition in
delete stmt,"
- + " or set delete_without_partition to
true");
- } else {
-
partitionNames.addAll(olapTable.getPartitionNames());
- }
- }
- } else if (olapTable.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED) {
- // this is a unpartitioned table, use table name as
partition name
- partitionNames.add(olapTable.getName());
- } else {
- throw new DdlException("Unknown partition type: " +
olapTable.getPartitionInfo().getType());
- }
- }
-
- Map<Long, Short> partitionReplicaNum = Maps.newHashMap();
- List<Partition> partitions = Lists.newArrayList();
- for (String partName : partitionNames) {
- Partition partition = olapTable.getPartition(partName);
- if (partition == null) {
- throw new DdlException("Partition does not exist.
name: " + partName);
- }
- partitions.add(partition);
- partitionReplicaNum.put(partition.getId(),
-
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum());
- }
-
- List<String> deleteConditions = Lists.newArrayList();
-
- // pre check
- checkDeleteV2(olapTable, partitions, conditions,
deleteConditions);
-
- // generate label
- String label = "delete_" + UUID.randomUUID();
- //generate jobId
- long jobId = Env.getCurrentEnv().getNextId();
- // begin txn here and generate txn id
- transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
- Lists.newArrayList(olapTable.getId()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
- TransactionState.LoadJobSourceType.FRONTEND, jobId,
Config.stream_load_default_timeout_second);
-
-
- DeleteInfo deleteInfo = new DeleteInfo(db.getId(),
olapTable.getId(), tableName, deleteConditions);
- deleteInfo.setPartitions(noPartitionSpecified,
partitions.stream().map(Partition::getId)
- .collect(Collectors.toList()), partitionNames);
- deleteJob = new DeleteJob(jobId, transactionId, label,
partitionReplicaNum, deleteInfo);
- idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob);
-
-
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob);
+ deleteJob = DeleteJob.newBuilder()
+ .buildWith(new DeleteJob.BuildParams(
+ targetDb,
+ targetTbl,
+ stmt.getPartitionNames(),
+ stmt.getDeleteConditions()));
+
+ long txnId = deleteJob.beginTxn();
TransactionState txnState =
Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(db.getId(), transactionId);
+ .getTransactionState(targetDb.getId(), txnId);
// must call this to make sure we only handle the tablet in
the mIndex we saw here.
- // table may be under schema changge or rollup, and the newly
created tablets will not be checked later,
+ // table may be under schema change or rollup, and the newly
created tablets will not be checked later,
// to make sure that the delete transaction can be done
successfully.
- txnState.addTableIndexes(olapTable);
-
- // task sent to be
- AgentBatchTask batchTask = new AgentBatchTask();
- // count total replica num
- // Get ALL materialized indexes, because delete condition will
be applied to all indexes
- int totalReplicaNum = 0;
- for (Partition partition : partitions) {
- for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- for (Tablet tablet : index.getTablets()) {
- totalReplicaNum += tablet.getReplicas().size();
- }
- }
- }
- countDownLatch = new MarkedCountDownLatch<Long,
Long>(totalReplicaNum);
-
- for (Partition partition : partitions) {
- for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- long indexId = index.getId();
- int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
-
- List<TColumn> columnsDesc = new ArrayList<TColumn>();
- for (Column column :
olapTable.getSchemaByIndexId(indexId)) {
- columnsDesc.add(column.toThrift());
- }
-
- for (Tablet tablet : index.getTablets()) {
- long tabletId = tablet.getId();
-
- // set push type
- TPushType type = TPushType.DELETE;
-
- for (Replica replica : tablet.getReplicas()) {
- long replicaId = replica.getId();
- long backendId = replica.getBackendId();
- countDownLatch.addMark(backendId, tabletId);
-
- // create push task for each replica
- PushTask pushTask = new PushTask(null,
- replica.getBackendId(), db.getId(),
olapTable.getId(),
- partition.getId(), indexId,
- tabletId, replicaId, schemaHash,
- -1, "", -1, 0,
- -1, type, conditions,
- true, TPriority.NORMAL,
- TTaskType.REALTIME_PUSH,
- transactionId,
- Env.getCurrentGlobalTransactionMgr()
-
.getTransactionIDGenerator().getNextTransactionId(),
- columnsDesc);
- pushTask.setIsSchemaChanging(false);
- pushTask.setCountDownLatch(countDownLatch);
-
- if (AgentTaskQueue.addTask(pushTask)) {
- batchTask.addTask(pushTask);
- deleteJob.addPushTask(pushTask);
- deleteJob.addTablet(tabletId);
- }
- }
- }
- }
- }
-
- // submit push tasks
- if (batchTask.getTaskNum() > 0) {
- AgentTaskExecutor.submit(batchTask);
- }
-
- } catch (Throwable t) {
- LOG.warn("error occurred during delete process", t);
- // if transaction has been begun, need to abort it
- if
(Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(),
transactionId) != null) {
- cancelJob(deleteJob, CancelType.UNKNOWN, t.getMessage());
- }
- throw new DdlException(t.getMessage(), t);
+ txnState.addTableIndexes(targetTbl);
+ idToDeleteJob.put(txnId, deleteJob);
+ deleteJob.dispatch();
} finally {
- olapTable.readUnlock();
+ targetTbl.readUnlock();
}
-
- long timeoutMs = deleteJob.getTimeoutMs();
- LOG.info("waiting delete Job finish, signature: {}, timeout: {}",
transactionId, timeoutMs);
- boolean ok = false;
- try {
- ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
- ok = false;
- }
-
- if (!ok) {
- String errMsg = "";
- List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
- // only show at most 5 results
- List<Entry<Long, Long>> subList = unfinishedMarks.subList(0,
Math.min(unfinishedMarks.size(), 5));
- if (!subList.isEmpty()) {
- errMsg = "unfinished replicas [BackendId=TabletId]: " +
Joiner.on(", ").join(subList);
- }
- LOG.warn(errMsg);
-
- try {
- deleteJob.checkAndUpdateQuorum();
- } catch (MetaNotFoundException e) {
- cancelJob(deleteJob, CancelType.METADATA_MISSING,
e.getMessage());
- throw new DdlException(e.getMessage(), e);
- }
- DeleteState state = deleteJob.getState();
- switch (state) {
- case UN_QUORUM:
- LOG.warn("delete job timeout: transactionId {},
timeout {}, {}",
- transactionId, timeoutMs, errMsg);
- cancelJob(deleteJob, CancelType.TIMEOUT, "delete job
timeout");
- throw new DdlException("failed to execute delete.
transaction id " + transactionId
- + ", timeout(ms) " + timeoutMs + ", " +
errMsg);
- case QUORUM_FINISHED:
- case FINISHED:
- try {
- long nowQuorumTimeMs = System.currentTimeMillis();
- long endQuorumTimeoutMs = nowQuorumTimeMs +
timeoutMs / 2;
- // if job's state is quorum_finished then wait for
a period of time and commit it.
- while (deleteJob.getState() ==
DeleteState.QUORUM_FINISHED
- && endQuorumTimeoutMs > nowQuorumTimeMs) {
- deleteJob.checkAndUpdateQuorum();
- Thread.sleep(1000);
- nowQuorumTimeMs = System.currentTimeMillis();
- LOG.debug("wait for quorum finished delete
job: {}, txn id: {}",
- deleteJob.getId(), transactionId);
- }
- } catch (MetaNotFoundException e) {
- cancelJob(deleteJob, CancelType.METADATA_MISSING,
e.getMessage());
- throw new DdlException(e.getMessage(), e);
- } catch (InterruptedException e) {
- cancelJob(deleteJob, CancelType.UNKNOWN,
e.getMessage());
- throw new DdlException(e.getMessage(), e);
- }
- commitJob(deleteJob, db, olapTable, timeoutMs);
- break;
- default:
- Preconditions.checkState(false, "wrong delete job
state: " + state.name());
- break;
- }
- } else {
- commitJob(deleteJob, db, olapTable, timeoutMs);
+ deleteJob.await();
+ String commitMsg = deleteJob.commit();
+ execState.setOk(0, 0, commitMsg);
+ } catch (Exception ex) {
+ if (deleteJob != null) {
+ deleteJob.cancel(ex.getMessage());
}
+ execState.setError(ex.getMessage());
} finally {
if (!FeConstants.runningUnitTest) {
clearJob(deleteJob);
@@ -410,176 +131,21 @@ public class DeleteHandler implements Writable {
}
}
- // Return null if there is no filter for the partition column
- private ColumnRange createColumnRange(OlapTable table, String colName,
List<Predicate> conditions)
- throws AnalysisException {
- ColumnRange result = ColumnRange.create();
- Type type =
- table.getBaseSchema().stream().filter(c ->
c.getName().equalsIgnoreCase(colName))
- .findFirst().get().getType();
-
- boolean hasRange = false;
- for (Predicate predicate : conditions) {
- List<Range<ColumnBound>> bounds = createColumnRange(colName,
predicate, type);
- if (bounds != null) {
- hasRange = true;
- result.intersect(bounds);
- }
- }
- if (hasRange) {
- return result;
- } else {
- return null;
- }
- }
-
- // Return null if the condition is not related to the partition column,
- // or the operator is not supported.
- private List<Range<ColumnBound>> createColumnRange(String colName,
Predicate condition, Type type)
- throws AnalysisException {
- List<Range<ColumnBound>> result = Lists.newLinkedList();
- if (condition instanceof BinaryPredicate) {
- BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
- if (!(binaryPredicate.getChild(0) instanceof SlotRef)) {
- return null;
- }
- String columnName = ((SlotRef)
binaryPredicate.getChild(0)).getColumnName();
- if (!colName.equalsIgnoreCase(columnName)) {
- return null;
- }
- ColumnBound bound = ColumnBound.of(
- LiteralExpr.create(((LiteralExpr)
binaryPredicate.getChild(1)).getStringValue(), type));
- switch (binaryPredicate.getOp()) {
- case EQ:
- result.add(Range.closed(bound, bound));
- break;
- case GE:
- result.add(Range.atLeast(bound));
- break;
- case GT:
- result.add(Range.greaterThan(bound));
- break;
- case LT:
- result.add(Range.lessThan(bound));
- break;
- case LE:
- result.add(Range.atMost(bound));
- break;
- case NE:
- result.add(Range.lessThan(bound));
- result.add(Range.greaterThan(bound));
- break;
- default:
- return null;
- }
- } else if (condition instanceof InPredicate) {
- InPredicate inPredicate = (InPredicate) condition;
- if (!(inPredicate.getChild(0) instanceof SlotRef)) {
- return null;
- }
- String columnName = ((SlotRef)
inPredicate.getChild(0)).getColumnName();
- if (!colName.equals(columnName)) {
- return null;
- }
- if (inPredicate.isNotIn()) {
- return null;
- }
- for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
- ColumnBound bound = ColumnBound.of(LiteralExpr
- .create(((LiteralExpr)
inPredicate.getChild(i)).getStringValue(), type));
- result.add(Range.closed(bound, bound));
- }
- } else {
- return null;
- }
- return result;
- }
-
- private void commitJob(DeleteJob job, Database db, Table table, long
timeoutMs)
- throws DdlException, QueryStateException {
- TransactionStatus status = TransactionStatus.UNKNOWN;
- try {
- boolean isVisible = unprotectedCommitJob(job, db, table,
timeoutMs);
- status = isVisible ? TransactionStatus.VISIBLE :
TransactionStatus.COMMITTED;
- } catch (UserException e) {
- if (cancelJob(job, CancelType.COMMIT_FAIL, e.getMessage())) {
- throw new DdlException(e.getMessage(), e);
- }
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("{'label':'").append(job.getLabel()).append("',
'status':'").append(status.name());
- sb.append("', 'txnId':'").append(job.getTransactionId()).append("'");
-
- switch (status) {
- case COMMITTED: {
- // Although publish is unfinished we should tell user that
commit already success.
- String errMsg = "delete job is committed but may be taking
effect later";
- sb.append(", 'err':'").append(errMsg).append("'");
- sb.append("}");
- throw new QueryStateException(MysqlStateType.OK,
sb.toString());
- }
- case VISIBLE: {
- sb.append("}");
- throw new QueryStateException(MysqlStateType.OK,
sb.toString());
- }
- default:
- Preconditions.checkState(false, "wrong transaction status: " +
status.name());
- break;
- }
- }
-
- /**
- * unprotected commit delete job
- * return true when successfully commit and publish
- * return false when successfully commit but publish unfinished.
- * A UserException thrown if both commit and publish failed.
- * @param job
- * @param db
- * @param timeoutMs
- * @return
- * @throws UserException
- */
- private boolean unprotectedCommitJob(DeleteJob job, Database db, Table
table, long timeoutMs) throws UserException {
- long transactionId = job.getTransactionId();
- GlobalTransactionMgr globalTransactionMgr =
Env.getCurrentGlobalTransactionMgr();
- List<TabletCommitInfo> tabletCommitInfos = new
ArrayList<TabletCommitInfo>();
- TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
- for (TabletDeleteInfo tDeleteInfo : job.getTabletDeleteInfo()) {
- for (Replica replica : tDeleteInfo.getFinishedReplicas()) {
- // the inverted index contains rolling up replica
- Long tabletId =
invertedIndex.getTabletIdByReplica(replica.getId());
- if (tabletId == null) {
- LOG.warn("could not find tablet id for replica {}, the
tablet maybe dropped", replica);
- continue;
- }
- tabletCommitInfos.add(new TabletCommitInfo(tabletId,
replica.getBackendId()));
- }
- }
- return globalTransactionMgr.commitAndPublishTransaction(db,
Lists.newArrayList(table),
- transactionId, tabletCommitInfos, timeoutMs);
- }
-
/**
* This method should always be called in the end of the delete process to
clean the job.
* Better put it in finally block.
+ *
* @param job
*/
private void clearJob(DeleteJob job) {
- if (job != null) {
- long signature = job.getTransactionId();
- if (idToDeleteJob.containsKey(signature)) {
- idToDeleteJob.remove(signature);
- }
- for (PushTask pushTask : job.getPushTasks()) {
- AgentTaskQueue.removePushTask(pushTask.getBackendId(),
pushTask.getSignature(),
- pushTask.getVersion(),
- pushTask.getPushType(), pushTask.getTaskType());
- }
-
- // NOT remove callback from GlobalTransactionMgr's callback
factory here.
- // the callback will be removed after transaction is aborted of
visible.
+ if (job == null) {
+ return;
}
+ long signature = job.getTransactionId();
+ idToDeleteJob.remove(signature);
+ job.cleanUp();
+ // do not remove callback from GlobalTransactionMgr's callback factory
here.
+ // the callback will be removed after transaction is aborted or
visible.
}
public void recordFinishedJob(DeleteJob job) {
@@ -598,250 +164,13 @@ public class DeleteHandler implements Writable {
}
}
- /**
- * abort delete job
- * return true when successfully abort.
- * return true when some unknown error happened, just ignore it.
- * return false when the job is already committed
- * @param job
- * @param cancelType
- * @param reason
- * @return
- */
- public boolean cancelJob(DeleteJob job, CancelType cancelType, String
reason) {
- LOG.info("start to cancel delete job, transactionId: {}, cancelType:
{}",
- job.getTransactionId(), cancelType.name());
- GlobalTransactionMgr globalTransactionMgr =
Env.getCurrentGlobalTransactionMgr();
- try {
- if (job != null) {
-
globalTransactionMgr.abortTransaction(job.getDeleteInfo().getDbId(),
job.getTransactionId(), reason);
- }
- } catch (Exception e) {
- TransactionState state = globalTransactionMgr.getTransactionState(
- job.getDeleteInfo().getDbId(), job.getTransactionId());
- if (state == null) {
- LOG.warn("cancel delete job failed because txn not found,
transactionId: {}",
- job.getTransactionId());
- } else if (state.getTransactionStatus() ==
TransactionStatus.COMMITTED
- || state.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.warn("cancel delete job failed because it has been
committed, transactionId: {}",
- job.getTransactionId());
- return false;
- } else {
- LOG.warn("errors while abort transaction", e);
- }
- }
- return true;
- }
-
public DeleteJob getDeleteJob(long transactionId) {
return idToDeleteJob.get(transactionId);
}
- private SlotRef getSlotRef(Predicate condition) {
- SlotRef slotRef = null;
- if (condition instanceof BinaryPredicate) {
- BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
- slotRef = (SlotRef) binaryPredicate.getChild(0);
- } else if (condition instanceof IsNullPredicate) {
- IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
- slotRef = (SlotRef) isNullPredicate.getChild(0);
- } else if (condition instanceof InPredicate) {
- InPredicate inPredicate = (InPredicate) condition;
- slotRef = (SlotRef) inPredicate.getChild(0);
- }
- return slotRef;
- }
-
- private void checkDeleteV2(OlapTable table, List<Partition> partitions,
- List<Predicate> conditions, List<String> deleteConditions)
- throws DdlException {
- // check condition column is key column and condition value
- // Here we use "getFullSchema()" to get all columns including VISIBLE
and SHADOW columns
- Map<String, Column> nameToColumn =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- for (Column column : table.getFullSchema()) {
- nameToColumn.put(column.getName(), column);
- }
-
- for (Predicate condition : conditions) {
- SlotRef slotRef = getSlotRef(condition);
- String columnName = slotRef.getColumnName();
- if (!nameToColumn.containsKey(columnName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR,
columnName, table.getName());
- }
-
- if (Column.isShadowColumn(columnName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Can not apply delete condition to shadow column");
- }
-
- // Check if this column is under schema change, if yes, there will
be a shadow column related to it.
- // And we don't allow doing delete operation when a condition
column is under schema change.
- String shadowColName = Column.getShadowName(columnName);
- if (nameToColumn.containsKey(shadowColName)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Column " + columnName + " is under"
- + " schema change operation. Do not allow delete
operation");
- }
-
- Column column = nameToColumn.get(columnName);
- // Due to rounding errors, most floating-point numbers end up
being slightly imprecise,
- // it also means that numbers expected to be equal often differ
slightly, so we do not allow compare with
- // floating-point numbers, floating-point number not allowed in
where clause
- if (column.getDataType().isFloatingPointType()) {
- throw new DdlException("Column[" + columnName + "] type is
float or double.");
- }
- if (!column.isKey()) {
- if (table.getKeysType() == KeysType.AGG_KEYS) {
- throw new DdlException("delete predicate on value column
only supports Unique table with"
- + " merge-on-write enabled and Duplicate table,
but " + "Table[" + table.getName()
- + "] is an Aggregate table.");
- } else if (table.getKeysType() == KeysType.UNIQUE_KEYS &&
!table.getEnableUniqueKeyMergeOnWrite()) {
- throw new DdlException("delete predicate on value column
only supports Unique table with"
- + " merge-on-write enabled and Duplicate table,
but " + "Table[" + table.getName()
- + "] is an Aggregate table.");
- }
- }
-
- if (condition instanceof BinaryPredicate) {
- String value = null;
- try {
- BinaryPredicate binaryPredicate = (BinaryPredicate)
condition;
- // if a bool cond passed to be, be's zone_map cannot
handle bool correctly,
- // change it to a tinyint type here;
- value = ((LiteralExpr)
binaryPredicate.getChild(1)).getStringValue();
- if (column.getDataType() == PrimitiveType.BOOLEAN) {
- if (value.toLowerCase().equals("true")) {
- binaryPredicate.setChild(1,
LiteralExpr.create("1", Type.TINYINT));
- } else if (value.toLowerCase().equals("false")) {
- binaryPredicate.setChild(1,
LiteralExpr.create("0", Type.TINYINT));
- }
- } else if (column.getDataType() == PrimitiveType.DATE
- || column.getDataType() == PrimitiveType.DATETIME
- || column.getDataType() == PrimitiveType.DATEV2) {
- DateLiteral dateLiteral = new DateLiteral(value,
Type.fromPrimitiveType(column.getDataType()));
- value = dateLiteral.getStringValue();
- binaryPredicate.setChild(1, LiteralExpr.create(value,
- Type.fromPrimitiveType(column.getDataType())));
- } else if (column.getDataType() ==
PrimitiveType.DATETIMEV2) {
- DateLiteral dateLiteral = new DateLiteral(value,
-
ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE));
- value = dateLiteral.getStringValue();
- binaryPredicate.setChild(1, LiteralExpr.create(value,
-
ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE)));
- }
- LiteralExpr.create(value, column.getType());
- } catch (AnalysisException e) {
- //
ErrorReport.reportDdlException(ErrorCode.ERR_INVALID_VALUE, value);
- throw new DdlException("Invalid column value[" + value +
"] for column " + columnName);
- }
- } else if (condition instanceof InPredicate) {
- String value = null;
- try {
- InPredicate inPredicate = (InPredicate) condition;
- for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
- value = inPredicate.getChild(i).getStringValue();
- if (column.getDataType() == PrimitiveType.DATE
- || column.getDataType() ==
PrimitiveType.DATETIME
- || column.getDataType() == PrimitiveType.DATEV2
- || column.getDataType() ==
PrimitiveType.DATETIMEV2) {
- DateLiteral dateLiteral = new DateLiteral(value,
- column.getType());
- value = dateLiteral.getStringValue();
- inPredicate.setChild(i, LiteralExpr.create(value,
- column.getType()));
- } else {
- LiteralExpr.create(value,
-
Type.fromPrimitiveType(column.getDataType()));
- }
- }
- } catch (AnalysisException e) {
- throw new DdlException("Invalid column value[" + value +
"] for column " + columnName);
- }
- }
-
- // set schema column name
- slotRef.setCol(column.getName());
- }
-
- // check materialized index.
- // only need to check the first partition, because each partition has
same materialized views
- Map<Long, List<Column>> indexIdToSchema = table.getIndexIdToSchema();
- Partition partition = partitions.get(0);
- // Here we check ALL materialized views instead of just VISIBLE ones.
- // For example, when a table is doing rollup or schema change. there
will be some SHADOW indexes.
- // And we also need to check these SHADOW indexes to see if the delete
condition can be applied to them.
- for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
- if (table.getBaseIndexId() == index.getId()) {
- continue;
- }
-
- // check table has condition column
- Map<String, Column> indexColNameToColumn =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- for (Column column : indexIdToSchema.get(index.getId())) {
- indexColNameToColumn.put(column.getName(), column);
- }
- String indexName = table.getIndexNameById(index.getId());
- for (Predicate condition : conditions) {
- SlotRef slotRef = getSlotRef(condition);
- String columnName = slotRef.getColumnName();
- Column column = indexColNameToColumn.get(columnName);
- if (column == null) {
-
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR,
- columnName, "index[" + indexName + "]");
- }
- MaterializedIndexMeta indexMeta =
table.getIndexIdToMeta().get(index.getId());
- if (indexMeta.getKeysType() != KeysType.DUP_KEYS &&
!column.isKey()) {
- throw new DdlException("Column[" + columnName + "] is not
key column in index[" + indexName + "]");
- }
- }
- }
-
- if (deleteConditions == null) {
- return;
- }
-
- // save delete conditions
- for (Predicate condition : conditions) {
- if (condition instanceof BinaryPredicate) {
- BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
- SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
- String columnName = slotRef.getColumnName();
- StringBuilder sb = new StringBuilder();
- sb.append(columnName).append("
").append(binaryPredicate.getOp().name()).append(" \"")
-
.append(binaryPredicate.getChild(1).getStringValue()).append("\"");
- deleteConditions.add(sb.toString());
- } else if (condition instanceof IsNullPredicate) {
- IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
- SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
- String columnName = slotRef.getColumnName();
- StringBuilder sb = new StringBuilder();
- sb.append(columnName);
- if (isNullPredicate.isNotNull()) {
- sb.append(" IS NOT NULL");
- } else {
- sb.append(" IS NULL");
- }
- deleteConditions.add(sb.toString());
- } else if (condition instanceof InPredicate) {
- InPredicate inPredicate = (InPredicate) condition;
- SlotRef slotRef = (SlotRef) inPredicate.getChild(0);
- String columnName = slotRef.getColumnName();
- StringBuilder strBuilder = new StringBuilder();
- String notStr = inPredicate.isNotIn() ? "NOT " : "";
- strBuilder.append(columnName).append("
").append(notStr).append("IN (");
- for (int i = 1; i <= inPredicate.getInElementNum(); ++i) {
- strBuilder.append(inPredicate.getChild(i).toSql());
- strBuilder.append((i != inPredicate.getInElementNum()) ?
", " : "");
- }
- strBuilder.append(")");
- deleteConditions.add(strBuilder.toString());
- }
- }
- }
-
// show delete stmt
public List<List<Comparable>> getDeleteInfosByDb(long dbId) {
- LinkedList<List<Comparable>> infos = new
LinkedList<List<Comparable>>();
+ LinkedList<List<Comparable>> infos = new LinkedList<>();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
return infos;
@@ -896,8 +225,8 @@ public class DeleteHandler implements Writable {
}
// sort by createTimeMs
- ListComparator<List<Comparable>> comparator = new
ListComparator<List<Comparable>>(2);
- Collections.sort(infos, comparator);
+ ListComparator<List<Comparable>> comparator = new ListComparator<>(2);
+ infos.sort(comparator);
return infos;
}
@@ -919,7 +248,6 @@ public class DeleteHandler implements Writable {
@Override
public void write(DataOutput out) throws IOException {
removeOldDeleteInfos();
- Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static DeleteHandler read(DataInput in) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
index 300c67e58d5..05c6c4b1a86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
@@ -60,10 +60,6 @@ public class DeleteInfo implements Writable,
GsonPostProcessable {
@SerializedName(value = "partitionName")
private String partitionName;
- public DeleteInfo() {
- this.deleteConditions = Lists.newArrayList();
- }
-
public DeleteInfo(long dbId, long tableId, String tableName, List<String>
deleteConditions) {
this.dbId = dbId;
this.tableId = tableId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 3723c226c5a..eaa4395093d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -17,29 +17,74 @@
package org.apache.doris.load;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.PartitionPruner;
+import org.apache.doris.planner.RangePartitionPrunerV2;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TColumn;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-public class DeleteJob extends AbstractTxnStateChangeCallback {
+public class DeleteJob extends AbstractTxnStateChangeCallback implements
DeleteJobLifeCycle {
private static final Logger LOG = LogManager.getLogger(DeleteJob.class);
+ public static final String DELETE_PREFIX = "delete_";
+
public enum DeleteState {
UN_QUORUM,
QUORUM_FINISHED,
@@ -49,18 +94,28 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
private DeleteState state;
// jobId(listenerId). use in beginTransaction to callback function
- private long id;
+ private final long id;
// transaction id.
private long signature;
- private String label;
- private Set<Long> totalTablets;
- private Set<Long> quorumTablets;
- private Set<Long> finishedTablets;
+ private final String label;
+ private final Set<Long> totalTablets;
+ private final Set<Long> quorumTablets;
+ private final Set<Long> finishedTablets;
Map<Long, TabletDeleteInfo> tabletDeleteInfoMap;
- private Set<PushTask> pushTasks;
- private DeleteInfo deleteInfo;
+ private final Set<PushTask> pushTasks;
+ private final DeleteInfo deleteInfo;
+
+ private final Map<Long, Short> partitionReplicaNum;
+
+ private Database targetDb;
+
+ private OlapTable targetTbl;
+
+ private List<Partition> partitions;
+
+ private List<Predicate> deleteConditions;
- private Map<Long, Short> partitionReplicaNum;
+ private MarkedCountDownLatch<Long, Long> countDownLatch;
public DeleteJob(long id, long transactionId, String label,
Map<Long, Short> partitionReplicaNum, DeleteInfo
deleteInfo) {
@@ -77,13 +132,17 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
this.partitionReplicaNum = partitionReplicaNum;
}
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
/**
* check and update if this job's state is QUORUM_FINISHED or FINISHED
* The meaning of state:
* QUORUM_FINISHED: For each tablet there are more than half of its
replicas have been finished
- * FINISHED: All replicas of this jobs have finished
+ * FINISHED: All replicas of this job have finished
*/
- public void checkAndUpdateQuorum() throws MetaNotFoundException {
+ private void checkAndUpdateQuorum() throws MetaNotFoundException {
long dbId = deleteInfo.getDbId();
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
@@ -122,32 +181,28 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
signature, totalTablets.size(), quorumTablets.size(),
dropCounter);
if (finishedTablets.containsAll(totalTablets)) {
- setState(DeleteState.FINISHED);
+ this.state = DeleteState.FINISHED;
} else if (quorumTablets.containsAll(totalTablets)) {
- setState(DeleteState.QUORUM_FINISHED);
+ this.state = DeleteState.QUORUM_FINISHED;
}
}
- public void setState(DeleteState state) {
- this.state = state;
- }
-
public DeleteState getState() {
return this.state;
}
- public boolean addTablet(long tabletId) {
- return totalTablets.add(tabletId);
+ private void addTablet(long tabletId) {
+ totalTablets.add(tabletId);
}
- public boolean addPushTask(PushTask pushTask) {
- return pushTasks.add(pushTask);
+ public void addPushTask(PushTask pushTask) {
+ pushTasks.add(pushTask);
}
- public boolean addFinishedReplica(long partitionId, long tabletId, Replica
replica) {
+ public void addFinishedReplica(long partitionId, long tabletId, Replica
replica) {
tabletDeleteInfoMap.putIfAbsent(tabletId, new
TabletDeleteInfo(partitionId, tabletId));
- TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId);
- return tDeleteInfo.addFinishedReplica(replica);
+ TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId);
+ tDeleteInfo.addFinishedReplica(replica);
}
public DeleteInfo getDeleteInfo() {
@@ -158,10 +213,6 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
return this.label;
}
- public Set<PushTask> getPushTasks() {
- return pushTasks;
- }
-
@Override
public long getId() {
return this.id;
@@ -177,14 +228,13 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
}
@Override
- public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason)
- throws UserException {
+ public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason) {
// just to clean the callback
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
}
public void executeFinish() {
- setState(DeleteState.FINISHED);
+ this.state = DeleteState.FINISHED;
Env.getCurrentEnv().getDeleteHandler().recordFinishedJob(this);
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
}
@@ -206,4 +256,451 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
long timeout = Math.max(totalTablets.size() *
Config.tablet_delete_timeout_second * 1000L, 30000L);
return Math.min(timeout, Config.delete_job_max_timeout_second * 1000L);
}
+
+ public void setTargetDb(Database targetDb) {
+ this.targetDb = targetDb;
+ }
+
+ public void setTargetTbl(OlapTable targetTbl) {
+ this.targetTbl = targetTbl;
+ }
+
+ public void setPartitions(List<Partition> partitions) {
+ this.partitions = partitions;
+ }
+
+ public void setDeleteConditions(List<Predicate> deleteConditions) {
+ this.deleteConditions = deleteConditions;
+ }
+
+ public void setCountDownLatch(MarkedCountDownLatch<Long, Long>
countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public long beginTxn() throws Exception {
+ long txnId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
+ Lists.newArrayList(deleteInfo.getTableId()), label, null,
+ new TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ TransactionState.LoadJobSourceType.FRONTEND, id,
Config.stream_load_default_timeout_second);
+ this.signature = txnId;
+
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+ return txnId;
+ }
+
+ @Override
+ public void dispatch() throws Exception {
+ // task sent to be
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Partition partition : partitions) {
+ for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
+ long indexId = index.getId();
+ int schemaHash = targetTbl.getSchemaHashByIndexId(indexId);
+
+ List<TColumn> columnsDesc = Lists.newArrayList();
+ for (Column column : targetTbl.getSchemaByIndexId(indexId)) {
+ columnsDesc.add(column.toThrift());
+ }
+
+ for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
+
+ // set push type
+ TPushType type = TPushType.DELETE;
+
+ for (Replica replica : tablet.getReplicas()) {
+ long replicaId = replica.getId();
+ long backendId = replica.getBackendId();
+ countDownLatch.addMark(backendId, tabletId);
+
+ // create push task for each replica
+ PushTask pushTask = new PushTask(null,
+ replica.getBackendId(), targetDb.getId(),
targetTbl.getId(),
+ partition.getId(), indexId,
+ tabletId, replicaId, schemaHash,
+ -1, "", -1, 0,
+ -1, type, deleteConditions,
+ true, TPriority.NORMAL,
+ TTaskType.REALTIME_PUSH,
+ signature,
+ Env.getCurrentGlobalTransactionMgr()
+
.getTransactionIDGenerator().getNextTransactionId(),
+ columnsDesc);
+ pushTask.setIsSchemaChanging(false);
+ pushTask.setCountDownLatch(countDownLatch);
+
+ if (AgentTaskQueue.addTask(pushTask)) {
+ batchTask.addTask(pushTask);
+ addPushTask(pushTask);
+ addTablet(tabletId);
+ }
+ }
+ }
+ }
+ }
+
+ // submit push tasks
+ if (batchTask.getTaskNum() > 0) {
+ AgentTaskExecutor.submit(batchTask);
+ }
+ }
+
+ @Override
+ public void await() throws Exception {
+ long timeoutMs = getTimeoutMs();
+ boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
+ if (ok) {
+ return;
+ }
+
+ //handle failure
+ String errMsg = "";
+ List<Map.Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
+ // only show at most 5 results
+ List<Map.Entry<Long, Long>> subList = unfinishedMarks.subList(0,
Math.min(unfinishedMarks.size(), 5));
+ if (!subList.isEmpty()) {
+ errMsg = "unfinished replicas [BackendId=TabletId]: " +
Joiner.on(", ").join(subList);
+ }
+ LOG.warn(errMsg);
+ checkAndUpdateQuorum();
+ switch (state) {
+ case UN_QUORUM:
+ LOG.warn("delete job timeout: transactionId {}, timeout {},
{}",
+ signature, timeoutMs, errMsg);
+ throw new UserException(String.format("delete job timeout,
timeout(ms):%s, msg:%s", timeoutMs, errMsg));
+ case QUORUM_FINISHED:
+ case FINISHED:
+ long nowQuorumTimeMs = System.currentTimeMillis();
+ long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
+ // if job's state is quorum_finished then wait for a period of
time and commit it.
+ while (state == DeleteState.QUORUM_FINISHED
+ && endQuorumTimeoutMs > nowQuorumTimeMs) {
+ checkAndUpdateQuorum();
+ Thread.sleep(1000);
+ nowQuorumTimeMs = System.currentTimeMillis();
+ LOG.debug("wait for quorum finished delete job: {}, txn
id: {}",
+ id, signature);
+ }
+ break;
+ default:
+ throw new IllegalStateException("wrong delete job state: " +
state.name());
+ }
+ }
+
+ @Override
+ public String commit() throws Exception {
+ TabletInvertedIndex currentInvertedIndex =
Env.getCurrentInvertedIndex();
+ List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
+ tabletDeleteInfoMap.forEach((tabletId, deleteInfo) ->
deleteInfo.getFinishedReplicas()
+ .forEach(replica -> {
+ if
(currentInvertedIndex.getTabletIdByReplica(replica.getId()) == null) {
+ LOG.warn("could not find tablet id for replica {}, the
tablet maybe dropped", replica);
+ return;
+ }
+ tabletCommitInfos.add(new TabletCommitInfo(tabletId,
replica.getBackendId()));
+ }));
+ boolean visible = Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(targetDb,
Lists.newArrayList(targetTbl),
+ signature, tabletCommitInfos, getTimeoutMs());
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(label);
+ sb.append("', 'txnId':'").append(signature)
+ .append("', 'status':'");
+ if (visible) {
+ sb.append(TransactionStatus.VISIBLE.name()).append("'");
+ sb.append("}");
+ } else {
+ // Although publish is unfinished we should tell user that commit
already success.
+ sb.append(TransactionStatus.COMMITTED.name()).append("'");
+ String msg = "delete job is committed but may be taking effect
later";
+ sb.append(", 'msg':'").append(msg).append("'");
+ sb.append("}");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void cancel(String reason) {
+ GlobalTransactionMgr globalTransactionMgr =
Env.getCurrentGlobalTransactionMgr();
+ try {
+ globalTransactionMgr.abortTransaction(deleteInfo.getDbId(),
signature, reason);
+ } catch (Exception e) {
+ TransactionState state = globalTransactionMgr.getTransactionState(
+ deleteInfo.getDbId(), signature);
+ if (state == null) {
+ LOG.warn("cancel delete job failed because txn not found,
transactionId: {}",
+ signature);
+ } else if (state.getTransactionStatus() ==
TransactionStatus.COMMITTED
+ || state.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ LOG.warn("cancel delete job failed because it has been
committed, transactionId: {}",
+ signature);
+ } else {
+ LOG.warn("errors while abort transaction", e);
+ }
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ for (PushTask pushTask : pushTasks) {
+ AgentTaskQueue.removePushTask(pushTask.getBackendId(),
pushTask.getSignature(),
+ pushTask.getVersion(),
+ pushTask.getPushType(), pushTask.getTaskType());
+ }
+ }
+
+ public static class BuildParams {
+
+ private final Database db;
+ private final OlapTable table;
+
+ private final Collection<String> partitionNames;
+
+ private final List<Predicate> deleteConditions;
+
+ public BuildParams(Database db, OlapTable table,
+ Collection<String> partitionNames,
+ List<Predicate> deleteConditions) {
+ this.db = db;
+ this.table = table;
+ this.partitionNames = partitionNames;
+ this.deleteConditions = deleteConditions;
+ }
+
+ public OlapTable getTable() {
+ return table;
+ }
+
+ public Collection<String> getPartitionNames() {
+ return partitionNames;
+ }
+
+ public Database getDb() {
+ return db;
+ }
+
+ public List<Predicate> getDeleteConditions() {
+ return deleteConditions;
+ }
+ }
+
+ public static class Builder {
+
+ public DeleteJob buildWith(BuildParams params) throws Exception {
+ List<Partition> partitions =
getSelectedPartitions(params.getTable(),
+ params.getPartitionNames(), params.getDeleteConditions());
+ Map<Long, Short> partitionReplicaNum = partitions.stream()
+ .collect(Collectors.toMap(
+ Partition::getId,
+ partition ->
+ params.getTable()
+ .getPartitionInfo()
+
.getReplicaAllocation(partition.getId())
+ .getTotalReplicaNum()));
+ // generate label
+ String label = DELETE_PREFIX + UUID.randomUUID();
+ //generate jobId
+ long jobId = Env.getCurrentEnv().getNextId();
+ DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(),
params.getTable().getId(),
+ params.getTable().getName(),
getDeleteCondString(params.getDeleteConditions()));
+ DeleteJob deleteJob = new DeleteJob(jobId, -1, label,
partitionReplicaNum, deleteInfo);
+ long replicaNum =
partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
+ deleteJob.setPartitions(partitions);
+ deleteJob.setDeleteConditions(params.getDeleteConditions());
+ deleteJob.setTargetDb(params.getDb());
+ deleteJob.setTargetTbl(params.getTable());
+ deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int)
replicaNum));
+ return deleteJob;
+ }
+
+ private List<Partition> getSelectedPartitions(OlapTable olapTable,
Collection<String> partitionNames,
+ List<Predicate>
deleteConditions) throws Exception {
+ if (partitionNames.isEmpty()) {
+ // Try to get selected partitions if no partition specified in
delete statement
+ // Use PartitionPruner to generate the select partitions
+ if (olapTable.getPartitionInfo().getType() ==
PartitionType.RANGE
+ || olapTable.getPartitionInfo().getType() ==
PartitionType.LIST) {
+ Set<String> partitionColumnNameSet =
olapTable.getPartitionColumnNames();
+ Map<String, ColumnRange> columnNameToRange =
Maps.newHashMap();
+ for (String colName : partitionColumnNameSet) {
+ ColumnRange columnRange = createColumnRange(olapTable,
colName, deleteConditions);
+ // Not all partition columns are involved in predicate
conditions
+ if (columnRange != null) {
+ columnNameToRange.put(colName, columnRange);
+ }
+ }
+
+ Collection<Long> selectedPartitionId = null;
+ if (!columnNameToRange.isEmpty()) {
+ PartitionInfo partitionInfo =
olapTable.getPartitionInfo();
+ Map<Long, PartitionItem> keyItemMap =
partitionInfo.getIdToItem(false);
+ PartitionPruner pruner =
olapTable.getPartitionInfo().getType() == PartitionType.RANGE
+ ? new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(),
+ columnNameToRange)
+ : new ListPartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(),
+ columnNameToRange);
+ selectedPartitionId = pruner.prune();
+ }
+ // selectedPartitionId is empty means no partition matches
conditions.
+ // How to return empty set in such case?
+ if (selectedPartitionId != null &&
!selectedPartitionId.isEmpty()) {
+ for (long partitionId : selectedPartitionId) {
+
partitionNames.add(olapTable.getPartition(partitionId).getName());
+ }
+ } else {
+ if
(!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
+ throw new UserException("This is a range or list
partitioned table."
+ + " You should specify partition in delete
stmt,"
+ + " or set delete_without_partition to
true");
+ } else {
+
partitionNames.addAll(olapTable.getPartitionNames());
+ }
+ }
+ } else if (olapTable.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED) {
+ // this is an un-partitioned table, use table name as
partition name
+ partitionNames.add(olapTable.getName());
+ } else {
+ throw new UserException("Unknown partition type: " +
olapTable.getPartitionInfo().getType());
+ }
+ }
+ List<Partition> partitions = Lists.newArrayList();
+ for (String partName : partitionNames) {
+ Partition partition = olapTable.getPartition(partName);
+ if (partition == null) {
+ throw new DdlException("Partition does not exist. name: "
+ partName);
+ }
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+
+ // Return null if there is no filter for the partition column
+ private ColumnRange createColumnRange(OlapTable table, String colName,
List<Predicate> conditions)
+ throws AnalysisException {
+
+ ColumnRange result = ColumnRange.create();
+ Type type =
+ table.getBaseSchema().stream().filter(c ->
c.getName().equalsIgnoreCase(colName))
+ .findFirst().get().getType();
+
+ boolean hasRange = false;
+ for (Predicate predicate : conditions) {
+ List<Range<ColumnBound>> bounds = createColumnRange(colName,
predicate, type);
+ if (bounds != null) {
+ hasRange = true;
+ result.intersect(bounds);
+ }
+ }
+ if (hasRange) {
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ // Return null if the condition is not related to the partition column,
+ // or the operator is not supported.
+ private List<Range<ColumnBound>> createColumnRange(String colName,
Predicate condition, Type type)
+ throws AnalysisException {
+ List<Range<ColumnBound>> result = Lists.newLinkedList();
+ if (condition instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+ if (!(binaryPredicate.getChild(0) instanceof SlotRef)) {
+ return null;
+ }
+ String columnName = ((SlotRef)
binaryPredicate.getChild(0)).getColumnName();
+ if (!colName.equalsIgnoreCase(columnName)) {
+ return null;
+ }
+ ColumnBound bound = ColumnBound.of(
+
LiteralExpr.create(binaryPredicate.getChild(1).getStringValue(), type));
+ switch (binaryPredicate.getOp()) {
+ case EQ:
+ result.add(Range.closed(bound, bound));
+ break;
+ case GE:
+ result.add(Range.atLeast(bound));
+ break;
+ case GT:
+ result.add(Range.greaterThan(bound));
+ break;
+ case LT:
+ result.add(Range.lessThan(bound));
+ break;
+ case LE:
+ result.add(Range.atMost(bound));
+ break;
+ case NE:
+ result.add(Range.lessThan(bound));
+ result.add(Range.greaterThan(bound));
+ break;
+ default:
+ return null;
+ }
+ } else if (condition instanceof InPredicate) {
+ InPredicate inPredicate = (InPredicate) condition;
+ if (!(inPredicate.getChild(0) instanceof SlotRef)) {
+ return null;
+ }
+ String columnName = ((SlotRef)
inPredicate.getChild(0)).getColumnName();
+ if (!colName.equals(columnName)) {
+ return null;
+ }
+ if (inPredicate.isNotIn()) {
+ return null;
+ }
+ for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
+ ColumnBound bound = ColumnBound.of(LiteralExpr
+ .create(inPredicate.getChild(i).getStringValue(),
type));
+ result.add(Range.closed(bound, bound));
+ }
+ } else {
+ return null;
+ }
+ return result;
+ }
+
+ private List<String> getDeleteCondString(List<Predicate> conditions) {
+ List<String> deleteConditions =
Lists.newArrayListWithCapacity(conditions.size());
+ // save delete conditions
+ for (Predicate condition : conditions) {
+ if (condition instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate)
condition;
+ SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
+ String columnName = slotRef.getColumnName();
+ String sb = columnName + " " +
binaryPredicate.getOp().name() + " \""
+ + binaryPredicate.getChild(1).getStringValue() +
"\"";
+ deleteConditions.add(sb);
+ } else if (condition instanceof IsNullPredicate) {
+ IsNullPredicate isNullPredicate = (IsNullPredicate)
condition;
+ SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
+ String columnName = slotRef.getColumnName();
+ StringBuilder sb = new StringBuilder();
+ sb.append(columnName);
+ if (isNullPredicate.isNotNull()) {
+ sb.append(" IS NOT NULL");
+ } else {
+ sb.append(" IS NULL");
+ }
+ deleteConditions.add(sb.toString());
+ } else if (condition instanceof InPredicate) {
+ InPredicate inPredicate = (InPredicate) condition;
+ SlotRef slotRef = (SlotRef) inPredicate.getChild(0);
+ String columnName = slotRef.getColumnName();
+ StringBuilder strBuilder = new StringBuilder();
+ String notStr = inPredicate.isNotIn() ? "NOT " : "";
+ strBuilder.append(columnName).append("
").append(notStr).append("IN (");
+ for (int i = 1; i <= inPredicate.getInElementNum(); ++i) {
+ strBuilder.append(inPredicate.getChild(i).toSql());
+ strBuilder.append((i != inPredicate.getInElementNum())
? ", " : "");
+ }
+ strBuilder.append(")");
+ deleteConditions.add(strBuilder.toString());
+ }
+ }
+ return deleteConditions;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java
new file mode 100644
index 00000000000..c12dd84169e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+public interface DeleteJobLifeCycle {
+
+ /**
+ * @return txn id
+ */
+ long beginTxn() throws Exception;
+
+ /**
+ * dispatch push tasks in an async way
+ */
+ void dispatch() throws Exception;
+
+ /**
+ * called after dispatch, waiting for quorum to be finished
+ */
+ void await() throws Exception;
+
+ /**
+ * commit job
+ * @return commit msg
+ */
+ String commit() throws Exception;
+
+ void cancel(String reason) throws Exception;
+
+ void cleanUp() throws Exception;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 8e62b436b86..b09f531c076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -78,7 +78,6 @@ import org.apache.doris.analysis.CreateUserStmt;
import org.apache.doris.analysis.CreateViewStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.DdlStmt;
-import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.analysis.DropDbStmt;
@@ -200,8 +199,6 @@ public class DdlExecutor {
} else if (ddlStmt instanceof ResumeJobStmt) {
ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
env.getJobRegister().resumeJob(stmt.getDbFullName(),
stmt.getName(), JobCategory.SQL);
- } else if (ddlStmt instanceof DeleteStmt) {
- env.getDeleteHandler().process((DeleteStmt) ddlStmt);
} else if (ddlStmt instanceof CreateUserStmt) {
CreateUserStmt stmt = (CreateUserStmt) ddlStmt;
env.getAuth().createUser(stmt);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 2ce0de22cb2..360353f2c79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -766,8 +766,14 @@ public class StmtExecutor {
} else if (parsedStmt instanceof UpdateStmt) {
handleUpdateStmt();
} else if (parsedStmt instanceof DdlStmt) {
- if (parsedStmt instanceof DeleteStmt && ((DeleteStmt)
parsedStmt).getInsertStmt() != null) {
- handleDeleteStmt();
+ if (parsedStmt instanceof DeleteStmt) {
+ if (((DeleteStmt) parsedStmt).getInsertStmt() != null) {
+ handleDeleteStmt();
+ } else {
+ Env.getCurrentEnv()
+ .getDeleteHandler()
+ .process((DeleteStmt) parsedStmt,
context.getState());
+ }
} else {
handleDdlStmt();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 5c62ef75a44..18f29ac30d2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -43,6 +43,7 @@ import
org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryStateException;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
@@ -213,8 +214,8 @@ public class DeleteHandlerTest {
};
}
- @Test(expected = DdlException.class)
- public void testUnQuorumTimeout() throws DdlException, QueryStateException
{
+ @Test
+ public void testUnQuorumTimeout() throws DdlException {
BinaryPredicate binaryPredicate = new
BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
new IntLiteral(3));
@@ -231,8 +232,9 @@ public class DeleteHandlerTest {
minTimes = 0;
}
};
- deleteHandler.process(deleteStmt);
- Assert.fail();
+ QueryState state = connectContext.getState();
+ deleteHandler.process(deleteStmt, state);
+ Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.ERR);
}
@Test
@@ -265,11 +267,9 @@ public class DeleteHandlerTest {
}
};
- try {
- deleteHandler.process(deleteStmt);
- } catch (QueryStateException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
+ QueryState state = connectContext.getState();
+ deleteHandler.process(deleteStmt, state);
+ Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
Map<Long, DeleteJob> idToDeleteJob =
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -310,11 +310,9 @@ public class DeleteHandlerTest {
}
};
- try {
- deleteHandler.process(deleteStmt);
- } catch (QueryStateException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
+ QueryState state = connectContext.getState();
+ deleteHandler.process(deleteStmt, state);
+ Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
Map<Long, DeleteJob> idToDeleteJob =
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -324,7 +322,7 @@ public class DeleteHandlerTest {
}
}
- @Test(expected = DdlException.class)
+ @Test
public void testCommitFail(@Mocked MarkedCountDownLatch countDownLatch)
throws DdlException, QueryStateException {
BinaryPredicate binaryPredicate = new
BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
new IntLiteral(3));
@@ -368,20 +366,8 @@ public class DeleteHandlerTest {
}
};
- try {
- deleteHandler.process(deleteStmt);
- } catch (DdlException e) {
- Map<Long, DeleteJob> idToDeleteJob =
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
- Collection<DeleteJob> jobs = idToDeleteJob.values();
- Assert.assertEquals(1, jobs.size());
- for (DeleteJob job : jobs) {
- Assert.assertEquals(job.getState(), DeleteState.FINISHED);
- }
- throw e;
- } catch (QueryStateException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
- Assert.fail();
+ deleteHandler.process(deleteStmt, connectContext.getState());
+ Assert.assertSame(connectContext.getState().getStateType(),
QueryState.MysqlStateType.ERR);
}
@Test
@@ -423,12 +409,9 @@ public class DeleteHandlerTest {
minTimes = 0;
}
};
-
- try {
- deleteHandler.process(deleteStmt);
- } catch (QueryStateException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
+ QueryState state = connectContext.getState();
+ deleteHandler.process(deleteStmt, state);
+ Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
Map<Long, DeleteJob> idToDeleteJob =
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
Collection<DeleteJob> jobs = idToDeleteJob.values();
@@ -471,11 +454,9 @@ public class DeleteHandlerTest {
}
};
- try {
- deleteHandler.process(deleteStmt);
- } catch (QueryStateException e) {
- // CHECKSTYLE IGNORE THIS LINE
- }
+ QueryState state = connectContext.getState();
+ deleteHandler.process(deleteStmt, state);
+ Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK);
Map<Long, DeleteJob> idToDeleteJob =
Deencapsulation.getField(deleteHandler, "idToDeleteJob");
Collection<DeleteJob> jobs = idToDeleteJob.values();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]