This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 2586f09 [Bug] Fix bug that SHOW DELETE not return Delete job info (#3515) 2586f09 is described below commit 2586f095486ee7019916c320a63af4c9fd712d66 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri May 8 13:04:20 2020 +0800 [Bug] Fix bug that SHOW DELETE not return Delete job info (#3515) The callback added to the CallbackFactory should not be removed until the transaction is aborted or visible. Otherwise, some callback method may failed to be called. --- .../apache/doris/common/MarkedCountDownLatch.java | 8 +++-- .../java/org/apache/doris/load/DeleteHandler.java | 40 ++++++++++++++-------- .../main/java/org/apache/doris/load/DeleteJob.java | 19 ++++++++-- .../doris/transaction/TxnStateCallbackFactory.java | 6 ++-- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java index 74a42ee..53aa426 100644 --- a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java +++ b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java @@ -56,11 +56,13 @@ public class MarkedCountDownLatch<K, V> extends CountDownLatch { } public synchronized void countDownToZero(Status status) { - while(getCount() > 0) { - super.countDown(); - } + // update status first before countDown. + // so that the waiting thread will get the correct status. if (st.ok()) { st = status; } + while(getCount() > 0) { + super.countDown(); + } } } diff --git a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java index c6eee8e..621cffc 100644 --- a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -17,11 +17,6 @@ package org.apache.doris.load; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.annotations.SerializedName; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.DeleteStmt; import org.apache.doris.analysis.IsNullPredicate; @@ -55,16 +50,16 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.DeleteJob.DeleteState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryStateException; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.QueryStateException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.load.DeleteJob.DeleteState; import org.apache.doris.task.PushTask; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; @@ -72,9 +67,16 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionStatus; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -255,6 +257,15 @@ public class DeleteHandler implements Writable { } if (!ok) { + String errMsg = ""; + List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks(); + // only show at most 5 results + List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5)); + if (!subList.isEmpty()) { + errMsg = "unfinished replicas: " + Joiner.on(", ").join(subList); + } + LOG.warn(errMsg); + try { deleteJob.checkAndUpdateQuorum(); } catch (MetaNotFoundException e) { @@ -264,14 +275,10 @@ public class DeleteHandler implements Writable { DeleteState state = deleteJob.getState(); switch (state) { case UN_QUORUM: - List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks(); - // only show at most 5 results - List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5)); - String errMsg = "Unfinished replicas:" + Joiner.on(", ").join(subList); LOG.warn("delete job timeout: transactionId {}, timeout {}, {}", transactionId, timeoutMs, errMsg); cancelJob(deleteJob, CancelType.TIMEOUT, "delete job timeout"); - throw new DdlException("failed to delete replicas from job: transactionId " + transactionId + - ", timeout " + timeoutMs + ", " + errMsg); + throw new DdlException("failed to execute delete. transaction id " + transactionId + + ", timeout(ms) " + timeoutMs + ", " + errMsg); case QUORUM_FINISHED: case FINISHED: try { @@ -282,6 +289,7 @@ public class DeleteHandler implements Writable { deleteJob.checkAndUpdateQuorum(); Thread.sleep(1000); nowQuorumTimeMs = System.currentTimeMillis(); + LOG.debug("wait for quorum finished delete job: {}, txn id: {}" + deleteJob.getId(), transactionId); } } catch (MetaNotFoundException e) { cancelJob(deleteJob, CancelType.METADATA_MISSING, e.getMessage()); @@ -386,7 +394,9 @@ public class DeleteHandler implements Writable { pushTask.getVersion(), pushTask.getVersionHash(), pushTask.getPushType(), pushTask.getTaskType()); } - Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(job.getId()); + + // NOT remove callback from GlobalTransactionMgr's callback factory here. + // the callback will be removed after transaction is aborted of visible. } } diff --git a/fe/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/src/main/java/org/apache/doris/load/DeleteJob.java index 972a9cd..957d9f5 100644 --- a/fe/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/src/main/java/org/apache/doris/load/DeleteJob.java @@ -17,17 +17,21 @@ package org.apache.doris.load; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.task.PushTask; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionState; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -165,6 +169,13 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { Catalog.getCurrentCatalog().getEditLog().logFinishDelete(deleteInfo); } + @Override + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) + throws UserException { + // just to clean the callback + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId()); + } + public void executeFinish() { setState(DeleteState.FINISHED); Catalog.getCurrentCatalog().getDeleteHandler().recordFinishedJob(this); @@ -180,6 +191,10 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { } public long getTimeoutMs() { + if (FeConstants.runningUnitTest) { + // for making unit test run fast + return 1000; + } // timeout is between 30 seconds to 5 min long timeout = Math.max(totalTablets.size() * Config.tablet_delete_timeout_second * 1000L, 30000L); return Math.min(timeout, Config.load_straggler_wait_second * 1000L); diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java index 28accde..b9d80bd 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java @@ -35,13 +35,15 @@ public class TxnStateCallbackFactory { return false; } callbacks.put(callback.getId(), callback); - LOG.info("add callback of txn state : {}", callback.getId()); + LOG.info("add callback of txn state : {}. current callback size: {}", + callback.getId(), callbacks.size()); return true; } public synchronized void removeCallback(long id) { if (callbacks.remove(id) != null) { - LOG.info("remove callback of txn state : {}", id); + LOG.info("remove callback of txn state : {}. current callback size: {}", + id, callbacks.size()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org