This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2586f09  [Bug] Fix bug that SHOW DELETE not return Delete job info 
(#3515)
2586f09 is described below

commit 2586f095486ee7019916c320a63af4c9fd712d66
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri May 8 13:04:20 2020 +0800

    [Bug] Fix bug that SHOW DELETE not return Delete job info (#3515)
    
    The callback added to the CallbackFactory should not be removed until the
    transaction is aborted or visible. Otherwise, some callback method may 
failed
    to be called.
---
 .../apache/doris/common/MarkedCountDownLatch.java  |  8 +++--
 .../java/org/apache/doris/load/DeleteHandler.java  | 40 ++++++++++++++--------
 .../main/java/org/apache/doris/load/DeleteJob.java | 19 ++++++++--
 .../doris/transaction/TxnStateCallbackFactory.java |  6 ++--
 4 files changed, 51 insertions(+), 22 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java 
b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
index 74a42ee..53aa426 100644
--- a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
+++ b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
@@ -56,11 +56,13 @@ public class MarkedCountDownLatch<K, V> extends 
CountDownLatch {
     }
 
     public synchronized void countDownToZero(Status status) {
-        while(getCount() > 0) {
-            super.countDown();
-        }
+        // update status first before countDown.
+        // so that the waiting thread will get the correct status.
         if (st.ok()) {
             st = status;
         }
+        while(getCount() > 0) {
+            super.countDown();
+        }
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java 
b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
index c6eee8e..621cffc 100644
--- a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -17,11 +17,6 @@
 
 package org.apache.doris.load;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.DeleteStmt;
 import org.apache.doris.analysis.IsNullPredicate;
@@ -55,16 +50,16 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.DeleteJob.DeleteState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryStateException;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.QueryStateException;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.load.DeleteJob.DeleteState;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
@@ -72,9 +67,16 @@ import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -255,6 +257,15 @@ public class DeleteHandler implements Writable {
             }
 
             if (!ok) {
+                String errMsg = "";
+                List<Entry<Long, Long>> unfinishedMarks = 
countDownLatch.getLeftMarks();
+                // only show at most 5 results
+                List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, 
Math.min(unfinishedMarks.size(), 5));
+                if (!subList.isEmpty()) {
+                    errMsg = "unfinished replicas: " + Joiner.on(", 
").join(subList);
+                }
+                LOG.warn(errMsg);
+
                 try {
                     deleteJob.checkAndUpdateQuorum();
                 } catch (MetaNotFoundException e) {
@@ -264,14 +275,10 @@ public class DeleteHandler implements Writable {
                 DeleteState state = deleteJob.getState();
                 switch (state) {
                     case UN_QUORUM:
-                        List<Entry<Long, Long>> unfinishedMarks = 
countDownLatch.getLeftMarks();
-                        // only show at most 5 results
-                        List<Entry<Long, Long>> subList = 
unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5));
-                        String errMsg = "Unfinished replicas:" + Joiner.on(", 
").join(subList);
                         LOG.warn("delete job timeout: transactionId {}, 
timeout {}, {}", transactionId, timeoutMs, errMsg);
                         cancelJob(deleteJob, CancelType.TIMEOUT, "delete job 
timeout");
-                        throw new DdlException("failed to delete replicas from 
job: transactionId " + transactionId +
-                                ", timeout " + timeoutMs + ", " + errMsg);
+                        throw new DdlException("failed to execute delete. 
transaction id " + transactionId +
+                                ", timeout(ms) " + timeoutMs + ", " + errMsg);
                     case QUORUM_FINISHED:
                     case FINISHED:
                         try {
@@ -282,6 +289,7 @@ public class DeleteHandler implements Writable {
                                 deleteJob.checkAndUpdateQuorum();
                                 Thread.sleep(1000);
                                 nowQuorumTimeMs = System.currentTimeMillis();
+                                LOG.debug("wait for quorum finished delete 
job: {}, txn id: {}" + deleteJob.getId(), transactionId);
                             }
                         } catch (MetaNotFoundException e) {
                             cancelJob(deleteJob, CancelType.METADATA_MISSING, 
e.getMessage());
@@ -386,7 +394,9 @@ public class DeleteHandler implements Writable {
                         pushTask.getVersion(), pushTask.getVersionHash(),
                         pushTask.getPushType(), pushTask.getTaskType());
             }
-            
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(job.getId());
+
+            // NOT remove callback from GlobalTransactionMgr's callback 
factory here.
+            // the callback will be removed after transaction is aborted of 
visible.
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/src/main/java/org/apache/doris/load/DeleteJob.java
index 972a9cd..957d9f5 100644
--- a/fe/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -17,17 +17,21 @@
 
 package org.apache.doris.load;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.TransactionState;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -165,6 +169,13 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
         Catalog.getCurrentCatalog().getEditLog().logFinishDelete(deleteInfo);
     }
 
+    @Override
+    public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason)
+            throws UserException {
+        // just to clean the callback
+        
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
+    }
+
     public void executeFinish() {
         setState(DeleteState.FINISHED);
         Catalog.getCurrentCatalog().getDeleteHandler().recordFinishedJob(this);
@@ -180,6 +191,10 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback {
     }
 
     public long getTimeoutMs() {
+        if (FeConstants.runningUnitTest) {
+            // for making unit test run fast
+            return 1000;
+        }
         // timeout is between 30 seconds to 5 min
         long timeout = Math.max(totalTablets.size() * 
Config.tablet_delete_timeout_second * 1000L, 30000L);
         return Math.min(timeout, Config.load_straggler_wait_second * 1000L);
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java 
b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
index 28accde..b9d80bd 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
@@ -35,13 +35,15 @@ public class TxnStateCallbackFactory {
             return false;
         }
         callbacks.put(callback.getId(), callback);
-        LOG.info("add callback of txn state : {}", callback.getId());
+        LOG.info("add callback of txn state : {}. current callback size: {}",
+                callback.getId(), callbacks.size());
         return true;
     }
 
     public synchronized void removeCallback(long id) {
         if (callbacks.remove(id) != null) {
-            LOG.info("remove callback of txn state : {}", id);
+            LOG.info("remove callback of txn state : {}. current callback 
size: {}",
+                    id, callbacks.size());
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to