morningman commented on a change in pull request #3369: URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r421537636
########## File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java ########## @@ -2639,6 +2641,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException { fullNameToDb.remove(db.getFullName()); final Cluster cluster = nameToCluster.get(db.getClusterName()); cluster.removeDb(dbName, db.getId()); + globalTransactionMgr.removeDatabaseTransactionMgr(db.getId()); Review comment: Actually, the operation `DropDb` is just put database to CatalogRecycleBin, not actually drop it. And the database can be recovered by `Recover` operation. So here you should not `removeDatabaseTransactionMgr()`, Instead, it should be called in `CatalogRecycleBin.eraseDatabase()` ########## File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java ########## @@ -2686,6 +2689,7 @@ public void replayDropDb(String dbName) throws DdlException { idToDb.remove(db.getId()); final Cluster cluster = nameToCluster.get(db.getClusterName()); cluster.removeDb(dbName, db.getId()); + globalTransactionMgr.removeDatabaseTransactionMgr(db.getId()); Review comment: Same to `dropDb`, this should be called in `CatalogRecycleBin.replayEraseDatabase()` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() { return callbackFactory; } + public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws AnalysisException { Review comment: ```suggestion public DatabaseTransactionMgr getDatabaseTransactionMgr(long dbId) throws AnalysisException { ``` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() { return callbackFactory; } + public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws AnalysisException { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); + if (dbTransactionMgr == null) { + throw new AnalysisException("databaseTransactionMgr[" + dbId + "] does not exist"); Review comment: AnalysisException is not suitable here. But it can be modified next time ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -156,114 +119,20 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU + Config.min_load_timeout_second + " and " + Config.max_load_timeout_second + " seconds"); } - - writeLock(); - try { - Preconditions.checkNotNull(coordinator); - Preconditions.checkNotNull(label); - FeNameFormat.checkLabel(label); - - /* - * Check if label already used, by following steps - * 1. get all existing transactions - * 2. if there is a PREPARE transaction, check if this is a retry request. If yes, return the - * existing txn id. - * 3. if there is a non-aborted transaction, throw label already used exception. - */ - Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label); - if (existingTxnIds != null && !existingTxnIds.isEmpty()) { - List<TransactionState> notAbortedTxns = Lists.newArrayList(); - for (long txnId : existingTxnIds) { - TransactionState txn = idToTransactionState.get(txnId); - Preconditions.checkNotNull(txn); - if (txn.getTransactionStatus() != TransactionStatus.ABORTED) { - notAbortedTxns.add(txn); - } - } - // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE status - Preconditions.checkState(notAbortedTxns.size() <= 1, notAbortedTxns); - if (!notAbortedTxns.isEmpty()) { - TransactionState notAbortedTxn = notAbortedTxns.get(0); - if (requestId != null && notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE - && notAbortedTxn.getRequsetId() != null && notAbortedTxn.getRequsetId().equals(requestId)) { - // this may be a retry request for same job, just return existing txn id. - throw new DuplicatedRequestException(DebugUtil.printId(requestId), - notAbortedTxn.getTransactionId(), ""); - } - throw new LabelAlreadyUsedException(label, notAbortedTxn.getTransactionStatus()); - } - } - checkRunningTxnExceedLimit(dbId, sourceType); - - long tid = idGenerator.getNextTransactionId(); - LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator); - TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, - coordinator, listenerId, timeoutSecond * 1000); - transactionState.setPrepareTime(System.currentTimeMillis()); - unprotectUpsertTransactionState(transactionState); - - if (MetricRepo.isInit.get()) { - MetricRepo.COUNTER_TXN_BEGIN.increase(1L); - } - - return tid; - } catch (DuplicatedRequestException e) { - throw e; - } catch (Exception e) { - if (MetricRepo.isInit.get()) { - MetricRepo.COUNTER_TXN_REJECT.increase(1L); - } - throw e; - } finally { - writeUnlock(); - } - } - - private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType sourceType) throws BeginTransactionException { - switch (sourceType) { - case ROUTINE_LOAD_TASK: - // no need to check limit for routine load task: - // 1. the number of running routine load tasks is limited by Config.max_routine_load_task_num_per_be - // 2. if we add routine load txn to runningTxnNums, runningTxnNums will always be occupied by routine load, - // and other txn may not be able to submitted. - break; - default: - if (runningTxnNums.getOrDefault(dbId, 0) >= Config.max_running_txn_num_per_db) { - throw new BeginTransactionException("current running txns on db " + dbId + " is " - + runningTxnNums.get(dbId) + ", larger than limit " + Config.max_running_txn_num_per_db); - } - break; - } + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.beginTransaction(tableIdList, label, requestId, coordinator, sourceType, listenerId, timeoutSecond); } public TransactionStatus getLabelState(long dbId, String label) { - readLock(); try { - Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label); - if (existingTxnIds == null || existingTxnIds.isEmpty()) { - return TransactionStatus.UNKNOWN; - } - // find the latest txn (which id is largest) - long maxTxnId = existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get(); - return idToTransactionState.get(maxTxnId).getTransactionStatus(); - } finally { - readUnlock(); - } - } - - public void deleteTransaction(long transactionId) { - writeLock(); - try { - TransactionState state = idToTransactionState.get(transactionId); - if (state == null) { - return; - } - replayDeleteTransactionState(state); - editLog.logDeleteTransactionState(state); - } finally { - writeUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getLabelState(label); + } catch (AnalysisException e) { + LOG.warn("Get transaction status by label " + label + " failed", e); + return null; Review comment: ```suggestion return TransactionStatus.UNKNOWN; ``` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { Review comment: This logic(Remove the timeout txn) can also be put into `DatabaseTransactionMgr`. `dbTransactionMgr. removeExpiredAndTimeoutTxns();` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { + try { + dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null); + LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); + } catch (UserException e) { + // abort may be failed. it is acceptable. just print a log + LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); } } - } finally { - readUnlock(); - } - // delete expired txns - for (Long txnId : expiredTxns) { - deleteTransaction(txnId); - LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager"); - } - - // abort timeout txns - for (Long txnId : timeoutTxns) { - try { - abortTransaction(txnId, "timeout by txn manager"); - LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); - } catch (UserException e) { - // abort may be failed. it is acceptable. just print a log - LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); - } } } - public TransactionState getTransactionState(long transactionId) { - readLock(); - try { - return idToTransactionState.get(transactionId); - } finally { - readUnlock(); - } + public TransactionState getTransactionState(long dbId, long transactionId) { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); + return dbTransactionMgr.getTransactionState(transactionId); } public void setEditLog(EditLog editLog) { - this.editLog = editLog; this.idGenerator.setEditLog(editLog); } - - private void readLock() { - this.transactionLock.readLock().lock(); - } - - private void readUnlock() { - this.transactionLock.readLock().unlock(); - } - - private void writeLock() { - this.transactionLock.writeLock().lock(); - } - - private void writeUnlock() { - this.transactionLock.writeLock().unlock(); - } - - // for add/update/delete TransactionState - private void unprotectUpsertTransactionState(TransactionState transactionState) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState); - } - - private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds, - Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, - Database db) { - // transaction state is modified during check if the transaction could committed - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { - return; - } - // update transaction state version - transactionState.setCommitTime(System.currentTimeMillis()); - transactionState.setTransactionStatus(TransactionStatus.COMMITTED); - transactionState.setErrorReplicas(errorReplicaIds); - for (long tableId : tableToPartition.keySet()) { - TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); - for (long partitionId : tableToPartition.get(tableId)) { - OlapTable table = (OlapTable) db.getTable(tableId); - Partition partition = table.getPartition(partitionId); - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); - tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); - } - transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); - } - // persist transactionState - unprotectUpsertTransactionState(transactionState); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } - } - private boolean unprotectAbortTransaction(long transactionId, String reason) - throws UserException { - TransactionState transactionState = idToTransactionState.get(transactionId); - if (transactionState == null) { - throw new UserException("transaction not found"); - } - if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - return false; - } - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - throw new UserException("transaction's state is already " - + transactionState.getTransactionStatus() + ", could not abort"); - } - transactionState.setFinishTime(System.currentTimeMillis()); - transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState); - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - return true; - } - // for replay idToTransactionState // check point also run transaction cleaner, the cleaner maybe concurrently modify id to public void replayUpsertTransactionState(TransactionState transactionState) { - writeLock(); try { - // set transaction status will call txn state change listener - transactionState.replaySetTransactionStatus(); - Database db = catalog.getDb(transactionState.getDbId()); - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - LOG.info("replay a committed transaction {}", transactionState); - updateCatalogAfterCommitted(transactionState, db); - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("replay a visible transaction {}", transactionState); - updateCatalogAfterVisible(transactionState, db); - } - TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId()); - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), - transactionState); - } finally { - writeUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.replayUpsertTransactionState(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay upsert transaction failed", e); Review comment: add transaction's id in log, for easy debugging. ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -1346,81 +359,71 @@ public TransactionIdGenerator getTransactionIDGenerator() { @Override public void write(DataOutput out) throws IOException { - int numTransactions = idToTransactionState.size(); + int numTransactions = getTransactionNum(); out.writeInt(numTransactions); - for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) { - entry.getValue().write(out); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.unprotectWriteAllTransactionStates(out); } idGenerator.write(out); } public void readFields(DataInput in) throws IOException { - int numTransactions = in.readInt(); - for (int i = 0; i < numTransactions; ++i) { - TransactionState transactionState = new TransactionState(); - transactionState.readFields(in); - TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId()); - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), - transactionState); + try { + int numTransactions = in.readInt(); + for (int i = 0; i < numTransactions; ++i) { + TransactionState transactionState = new TransactionState(); + transactionState.readFields(in); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true); + } + idGenerator.readFields(in); + } catch (AnalysisException e) { + throw new IOException("Read transaction states failed", e); } - idGenerator.readFields(in); + } - public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set<TransactionStatus> status) { - readLock(); + public TransactionState getTransactionStateByCallbackIdAndStatus(long dbId, long callbackId, Set<TransactionStatus> status) { try { - for (TransactionState txn : idToTransactionState.values()) { - if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) { - return txn; - } - } - } finally { - readUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status); + } catch (AnalysisException e) { + LOG.warn("Get transaction by callbackId and status failed", e); + return null; } - return null; } - public TransactionState getTransactionStateByCallbackId(long callbackId) { - readLock(); + public TransactionState getTransactionStateByCallbackId(long dbId, long callbackId) { try { - for (TransactionState txn : idToTransactionState.values()) { - if (txn.getCallbackId() == callbackId) { - return txn; - } - } - } finally { - readUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getTransactionStateByCallbackId(callbackId); + } catch (AnalysisException e) { + LOG.warn("Get transaction by callbackId failed", e); + return null; } - return null; } - public List<Long> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { - ArrayList<Long> txnIds = new ArrayList<>(); - readLock(); - try { - idToTransactionState.values().stream() - .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE - && t.getCoordinator().ip.equals(coordinateHost) - && (!t.getTransactionStatus().isFinalStatus()))) - .limit(limit) - .forEach(t -> txnIds.add(t.getTransactionId())); - } finally { - readUnlock(); + public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>(); + for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit)); + if (txnInfos.size() > limit) { + break; + } } - return txnIds; + return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos; Review comment: ```suggestion return txnInfos.size() > limit ? txnInfos.subList(0, limit) : txnInfos; ``` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { + try { + dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null); + LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); + } catch (UserException e) { + // abort may be failed. it is acceptable. just print a log + LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); } } - } finally { - readUnlock(); - } - // delete expired txns - for (Long txnId : expiredTxns) { - deleteTransaction(txnId); - LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager"); - } - - // abort timeout txns - for (Long txnId : timeoutTxns) { - try { - abortTransaction(txnId, "timeout by txn manager"); - LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); - } catch (UserException e) { - // abort may be failed. it is acceptable. just print a log - LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); - } } } - public TransactionState getTransactionState(long transactionId) { - readLock(); - try { - return idToTransactionState.get(transactionId); - } finally { - readUnlock(); - } + public TransactionState getTransactionState(long dbId, long transactionId) { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); Review comment: No throw exception? `dbTransactionMgr` could be `null` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { + try { + dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null); + LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); + } catch (UserException e) { + // abort may be failed. it is acceptable. just print a log + LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); } } - } finally { - readUnlock(); - } - // delete expired txns - for (Long txnId : expiredTxns) { - deleteTransaction(txnId); - LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager"); - } - - // abort timeout txns - for (Long txnId : timeoutTxns) { - try { - abortTransaction(txnId, "timeout by txn manager"); - LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); - } catch (UserException e) { - // abort may be failed. it is acceptable. just print a log - LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); - } } } - public TransactionState getTransactionState(long transactionId) { - readLock(); - try { - return idToTransactionState.get(transactionId); - } finally { - readUnlock(); - } + public TransactionState getTransactionState(long dbId, long transactionId) { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); + return dbTransactionMgr.getTransactionState(transactionId); } public void setEditLog(EditLog editLog) { - this.editLog = editLog; this.idGenerator.setEditLog(editLog); } - - private void readLock() { - this.transactionLock.readLock().lock(); - } - - private void readUnlock() { - this.transactionLock.readLock().unlock(); - } - - private void writeLock() { - this.transactionLock.writeLock().lock(); - } - - private void writeUnlock() { - this.transactionLock.writeLock().unlock(); - } - - // for add/update/delete TransactionState - private void unprotectUpsertTransactionState(TransactionState transactionState) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState); - } - - private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds, - Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, - Database db) { - // transaction state is modified during check if the transaction could committed - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { - return; - } - // update transaction state version - transactionState.setCommitTime(System.currentTimeMillis()); - transactionState.setTransactionStatus(TransactionStatus.COMMITTED); - transactionState.setErrorReplicas(errorReplicaIds); - for (long tableId : tableToPartition.keySet()) { - TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); - for (long partitionId : tableToPartition.get(tableId)) { - OlapTable table = (OlapTable) db.getTable(tableId); - Partition partition = table.getPartition(partitionId); - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); - tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); - } - transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); - } - // persist transactionState - unprotectUpsertTransactionState(transactionState); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } - } - private boolean unprotectAbortTransaction(long transactionId, String reason) - throws UserException { - TransactionState transactionState = idToTransactionState.get(transactionId); - if (transactionState == null) { - throw new UserException("transaction not found"); - } - if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - return false; - } - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - throw new UserException("transaction's state is already " - + transactionState.getTransactionStatus() + ", could not abort"); - } - transactionState.setFinishTime(System.currentTimeMillis()); - transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState); - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - return true; - } - // for replay idToTransactionState // check point also run transaction cleaner, the cleaner maybe concurrently modify id to public void replayUpsertTransactionState(TransactionState transactionState) { - writeLock(); try { - // set transaction status will call txn state change listener - transactionState.replaySetTransactionStatus(); - Database db = catalog.getDb(transactionState.getDbId()); - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - LOG.info("replay a committed transaction {}", transactionState); - updateCatalogAfterCommitted(transactionState, db); - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("replay a visible transaction {}", transactionState); - updateCatalogAfterVisible(transactionState, db); - } - TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId()); - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), - transactionState); - } finally { - writeUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.replayUpsertTransactionState(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay upsert transaction failed", e); } + } public void replayDeleteTransactionState(TransactionState transactionState) { - writeLock(); try { - idToTransactionState.remove(transactionState.getTransactionId()); - Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel()); - txnIds.remove(transactionState.getTransactionId()); - if (txnIds.isEmpty()) { - dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel()); - } - } finally { - writeUnlock(); - } - } - - private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) { - Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTable(tableId); - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - long partitionId = partitionCommitInfo.getPartitionId(); - Partition partition = table.getPartition(partitionId); - List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex index : allIndices) { - List<Tablet> tablets = index.getTablets(); - for (Tablet tablet : tablets) { - for (Replica replica : tablet.getReplicas()) { - if (errorReplicaIds.contains(replica.getId())) { - // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally - // should get from transaction state - replica.updateLastFailedVersion(partitionCommitInfo.getVersion(), - partitionCommitInfo.getVersionHash()); - } - } - } - } - partition.setNextVersion(partition.getNextVersion() + 1); - // Although committed version(hash) is not visible to user, - // but they need to be synchronized among Frontends. - // because we use committed version(hash) to create clone task, if the first Master FE - // send clone task with committed version hash X, and than Master changed, the new Master FE - // received the clone task report with version hash X, which not equals to it own committed - // version hash, than the clone task is failed. - partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */, - partitionCommitInfo.getVersionHash() /* committed version hash*/); - } + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.deleteTransaction(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay delete transaction failed", e); } } - - private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { - Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTable(tableId); - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - long partitionId = partitionCommitInfo.getPartitionId(); - long newCommitVersion = partitionCommitInfo.getVersion(); - long newCommitVersionHash = partitionCommitInfo.getVersionHash(); - Partition partition = table.getPartition(partitionId); - List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex index : allIndices) { - for (Tablet tablet : index.getTablets()) { - for (Replica replica : tablet.getReplicas()) { - long lastFailedVersion = replica.getLastFailedVersion(); - long lastFailedVersionHash = replica.getLastFailedVersionHash(); - long newVersion = newCommitVersion; - long newVersionHash = newCommitVersionHash; - long lastSucessVersion = replica.getLastSuccessVersion(); - long lastSuccessVersionHash = replica.getLastSuccessVersionHash(); - if (!errorReplicaIds.contains(replica.getId())) { - if (replica.getLastFailedVersion() > 0) { - // if the replica is a failed replica, then not changing version and version hash - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), - partition.getVisibleVersionHash(), true)) { - // this means the replica has error in the past, but we did not observe it - // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica - // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback - // then we will detect this and set C's last failed version to 10 and last success version to 11 - // this logic has to be replayed in checkpoint thread - lastFailedVersion = partition.getVisibleVersion(); - lastFailedVersionHash = partition.getVisibleVersionHash(); - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - } - // success version always move forward - lastSucessVersion = newCommitVersion; - lastSuccessVersionHash = newCommitVersionHash; - } else { - // for example, A,B,C 3 replicas, B,C failed during publish version, then B C will be set abnormal - // all loading will failed, B,C will have to recovery by clone, it is very inefficient and maybe lost data - // Using this method, B,C will publish failed, and fe will publish again, not update their last failed version - // if B is publish successfully in next turn, then B is normal and C will be set abnormal so that quorum is maintained - // and loading will go on. - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - if (newCommitVersion > lastFailedVersion) { - lastFailedVersion = newCommitVersion; - lastFailedVersionHash = newCommitVersionHash; - } - } - replica.updateVersionInfo(newVersion, newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion, lastSuccessVersionHash); - } - } - } // end for indices - long version = partitionCommitInfo.getVersion(); - long versionHash = partitionCommitInfo.getVersionHash(); - partition.updateVisibleVersionAndVersionHash(version, versionHash); - if (LOG.isDebugEnabled()) { - LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]", - transactionState, partition.getId(), version, versionHash); - } - } - } - return true; - } - - private void updateTxnLabels(TransactionState transactionState) { - Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel()); - if (txnIds == null) { - txnIds = Sets.newHashSet(); - dbIdToTxnLabels.put(transactionState.getDbId(), transactionState.getLabel(), txnIds); - } - txnIds.add(transactionState.getTransactionId()); - } - - private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) { - Map<Long, Integer> txnNumMap = null; - if (curTxnState.getSourceType() == LoadJobSourceType.ROUTINE_LOAD_TASK) { - txnNumMap = runningRoutineLoadTxnNums; - } else { - txnNumMap = runningTxnNums; - } - - int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0); - if (preStatus == null - && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE - || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) { - ++txnNum; - } else if ((preStatus == TransactionStatus.PREPARE - || preStatus == TransactionStatus.COMMITTED) - && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE - || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) { - --txnNum; - } - - if (txnNum < 1) { - txnNumMap.remove(curTxnState.getDbId()); - } else { - txnNumMap.put(curTxnState.getDbId(), txnNum); - } - } - public List<List<Comparable>> getDbInfo() { List<List<Comparable>> infos = new ArrayList<List<Comparable>>(); - readLock(); - try { - Set<Long> dbIds = new HashSet<>(); - for (TransactionState transactionState : idToTransactionState.values()) { - dbIds.add(transactionState.getDbId()); - } - for (long dbId : dbIds) { - List<Comparable> info = new ArrayList<Comparable>(); - info.add(dbId); - Database db = Catalog.getInstance().getDb(dbId); - if (db == null) { - continue; - } - info.add(db.getFullName()); - infos.add(info); + List<Long> dbIds = Lists.newArrayList(); Review comment: ```suggestion List<Long> dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet()); ``` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { + try { + dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null); + LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); + } catch (UserException e) { + // abort may be failed. it is acceptable. just print a log + LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); } } - } finally { - readUnlock(); - } - // delete expired txns - for (Long txnId : expiredTxns) { - deleteTransaction(txnId); - LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager"); - } - - // abort timeout txns - for (Long txnId : timeoutTxns) { - try { - abortTransaction(txnId, "timeout by txn manager"); - LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); - } catch (UserException e) { - // abort may be failed. it is acceptable. just print a log - LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); - } } } - public TransactionState getTransactionState(long transactionId) { - readLock(); - try { - return idToTransactionState.get(transactionId); - } finally { - readUnlock(); - } + public TransactionState getTransactionState(long dbId, long transactionId) { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); + return dbTransactionMgr.getTransactionState(transactionId); } public void setEditLog(EditLog editLog) { - this.editLog = editLog; this.idGenerator.setEditLog(editLog); } - - private void readLock() { - this.transactionLock.readLock().lock(); - } - - private void readUnlock() { - this.transactionLock.readLock().unlock(); - } - - private void writeLock() { - this.transactionLock.writeLock().lock(); - } - - private void writeUnlock() { - this.transactionLock.writeLock().unlock(); - } - - // for add/update/delete TransactionState - private void unprotectUpsertTransactionState(TransactionState transactionState) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState); - } - - private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds, - Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, - Database db) { - // transaction state is modified during check if the transaction could committed - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { - return; - } - // update transaction state version - transactionState.setCommitTime(System.currentTimeMillis()); - transactionState.setTransactionStatus(TransactionStatus.COMMITTED); - transactionState.setErrorReplicas(errorReplicaIds); - for (long tableId : tableToPartition.keySet()) { - TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); - for (long partitionId : tableToPartition.get(tableId)) { - OlapTable table = (OlapTable) db.getTable(tableId); - Partition partition = table.getPartition(partitionId); - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); - tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); - } - transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); - } - // persist transactionState - unprotectUpsertTransactionState(transactionState); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } - } - private boolean unprotectAbortTransaction(long transactionId, String reason) - throws UserException { - TransactionState transactionState = idToTransactionState.get(transactionId); - if (transactionState == null) { - throw new UserException("transaction not found"); - } - if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - return false; - } - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - throw new UserException("transaction's state is already " - + transactionState.getTransactionStatus() + ", could not abort"); - } - transactionState.setFinishTime(System.currentTimeMillis()); - transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState); - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - return true; - } - // for replay idToTransactionState // check point also run transaction cleaner, the cleaner maybe concurrently modify id to public void replayUpsertTransactionState(TransactionState transactionState) { - writeLock(); try { - // set transaction status will call txn state change listener - transactionState.replaySetTransactionStatus(); - Database db = catalog.getDb(transactionState.getDbId()); - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - LOG.info("replay a committed transaction {}", transactionState); - updateCatalogAfterCommitted(transactionState, db); - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("replay a visible transaction {}", transactionState); - updateCatalogAfterVisible(transactionState, db); - } - TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId()); - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), - transactionState); - } finally { - writeUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.replayUpsertTransactionState(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay upsert transaction failed", e); } + } public void replayDeleteTransactionState(TransactionState transactionState) { - writeLock(); try { - idToTransactionState.remove(transactionState.getTransactionId()); - Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel()); - txnIds.remove(transactionState.getTransactionId()); - if (txnIds.isEmpty()) { - dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel()); - } - } finally { - writeUnlock(); - } - } - - private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) { - Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTable(tableId); - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - long partitionId = partitionCommitInfo.getPartitionId(); - Partition partition = table.getPartition(partitionId); - List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex index : allIndices) { - List<Tablet> tablets = index.getTablets(); - for (Tablet tablet : tablets) { - for (Replica replica : tablet.getReplicas()) { - if (errorReplicaIds.contains(replica.getId())) { - // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally - // should get from transaction state - replica.updateLastFailedVersion(partitionCommitInfo.getVersion(), - partitionCommitInfo.getVersionHash()); - } - } - } - } - partition.setNextVersion(partition.getNextVersion() + 1); - // Although committed version(hash) is not visible to user, - // but they need to be synchronized among Frontends. - // because we use committed version(hash) to create clone task, if the first Master FE - // send clone task with committed version hash X, and than Master changed, the new Master FE - // received the clone task report with version hash X, which not equals to it own committed - // version hash, than the clone task is failed. - partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */, - partitionCommitInfo.getVersionHash() /* committed version hash*/); - } + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.deleteTransaction(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay delete transaction failed", e); Review comment: Add txn id in log. ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -17,69 +17,33 @@ package org.apache.doris.transaction; +import org.apache.commons.lang3.tuple.Pair; Review comment: You can use `org.apache.doris.common.Pair` ########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t */ public void removeExpiredAndTimeoutTxns() { long currentMillis = System.currentTimeMillis(); - - List<Long> timeoutTxns = Lists.newArrayList(); - List<Long> expiredTxns = Lists.newArrayList(); - readLock(); - try { - for (TransactionState transactionState : idToTransactionState.values()) { - if (transactionState.isExpired(currentMillis)) { - // remove the txn which labels are expired - expiredTxns.add(transactionState.getTransactionId()); - } else if (transactionState.isTimeout(currentMillis)) { - // txn is running but timeout, abort it. - timeoutTxns.add(transactionState.getTransactionId()); + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + dbTransactionMgr.removeExpiredTxns(); + List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis); + // abort timeout txns + for (Long txnId : timeoutTxns) { + try { + dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null); + LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); + } catch (UserException e) { + // abort may be failed. it is acceptable. just print a log + LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); } } - } finally { - readUnlock(); - } - // delete expired txns - for (Long txnId : expiredTxns) { - deleteTransaction(txnId); - LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager"); - } - - // abort timeout txns - for (Long txnId : timeoutTxns) { - try { - abortTransaction(txnId, "timeout by txn manager"); - LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager"); - } catch (UserException e) { - // abort may be failed. it is acceptable. just print a log - LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); - } } } - public TransactionState getTransactionState(long transactionId) { - readLock(); - try { - return idToTransactionState.get(transactionId); - } finally { - readUnlock(); - } + public TransactionState getTransactionState(long dbId, long transactionId) { + DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId); + return dbTransactionMgr.getTransactionState(transactionId); } public void setEditLog(EditLog editLog) { - this.editLog = editLog; this.idGenerator.setEditLog(editLog); } - - private void readLock() { - this.transactionLock.readLock().lock(); - } - - private void readUnlock() { - this.transactionLock.readLock().unlock(); - } - - private void writeLock() { - this.transactionLock.writeLock().lock(); - } - - private void writeUnlock() { - this.transactionLock.writeLock().unlock(); - } - - // for add/update/delete TransactionState - private void unprotectUpsertTransactionState(TransactionState transactionState) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState); - } - - private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds, - Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, - Database db) { - // transaction state is modified during check if the transaction could committed - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { - return; - } - // update transaction state version - transactionState.setCommitTime(System.currentTimeMillis()); - transactionState.setTransactionStatus(TransactionStatus.COMMITTED); - transactionState.setErrorReplicas(errorReplicaIds); - for (long tableId : tableToPartition.keySet()) { - TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); - for (long partitionId : tableToPartition.get(tableId)) { - OlapTable table = (OlapTable) db.getTable(tableId); - Partition partition = table.getPartition(partitionId); - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); - tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); - } - transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); - } - // persist transactionState - unprotectUpsertTransactionState(transactionState); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } - } - private boolean unprotectAbortTransaction(long transactionId, String reason) - throws UserException { - TransactionState transactionState = idToTransactionState.get(transactionId); - if (transactionState == null) { - throw new UserException("transaction not found"); - } - if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - return false; - } - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - throw new UserException("transaction's state is already " - + transactionState.getTransactionStatus() + ", could not abort"); - } - transactionState.setFinishTime(System.currentTimeMillis()); - transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState); - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - return true; - } - // for replay idToTransactionState // check point also run transaction cleaner, the cleaner maybe concurrently modify id to public void replayUpsertTransactionState(TransactionState transactionState) { - writeLock(); try { - // set transaction status will call txn state change listener - transactionState.replaySetTransactionStatus(); - Database db = catalog.getDb(transactionState.getDbId()); - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { - LOG.info("replay a committed transaction {}", transactionState); - updateCatalogAfterCommitted(transactionState, db); - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("replay a visible transaction {}", transactionState); - updateCatalogAfterVisible(transactionState, db); - } - TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId()); - idToTransactionState.put(transactionState.getTransactionId(), transactionState); - updateTxnLabels(transactionState); - updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), - transactionState); - } finally { - writeUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.replayUpsertTransactionState(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay upsert transaction failed", e); } + } public void replayDeleteTransactionState(TransactionState transactionState) { - writeLock(); try { - idToTransactionState.remove(transactionState.getTransactionId()); - Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel()); - txnIds.remove(transactionState.getTransactionId()); - if (txnIds.isEmpty()) { - dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel()); - } - } finally { - writeUnlock(); - } - } - - private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) { - Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTable(tableId); - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - long partitionId = partitionCommitInfo.getPartitionId(); - Partition partition = table.getPartition(partitionId); - List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex index : allIndices) { - List<Tablet> tablets = index.getTablets(); - for (Tablet tablet : tablets) { - for (Replica replica : tablet.getReplicas()) { - if (errorReplicaIds.contains(replica.getId())) { - // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally - // should get from transaction state - replica.updateLastFailedVersion(partitionCommitInfo.getVersion(), - partitionCommitInfo.getVersionHash()); - } - } - } - } - partition.setNextVersion(partition.getNextVersion() + 1); - // Although committed version(hash) is not visible to user, - // but they need to be synchronized among Frontends. - // because we use committed version(hash) to create clone task, if the first Master FE - // send clone task with committed version hash X, and than Master changed, the new Master FE - // received the clone task report with version hash X, which not equals to it own committed - // version hash, than the clone task is failed. - partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */, - partitionCommitInfo.getVersionHash() /* committed version hash*/); - } + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId()); + dbTransactionMgr.deleteTransaction(transactionState); + } catch (AnalysisException e) { + LOG.warn("replay delete transaction failed", e); } } - - private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { - Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTable(tableId); - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - long partitionId = partitionCommitInfo.getPartitionId(); - long newCommitVersion = partitionCommitInfo.getVersion(); - long newCommitVersionHash = partitionCommitInfo.getVersionHash(); - Partition partition = table.getPartition(partitionId); - List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex index : allIndices) { - for (Tablet tablet : index.getTablets()) { - for (Replica replica : tablet.getReplicas()) { - long lastFailedVersion = replica.getLastFailedVersion(); - long lastFailedVersionHash = replica.getLastFailedVersionHash(); - long newVersion = newCommitVersion; - long newVersionHash = newCommitVersionHash; - long lastSucessVersion = replica.getLastSuccessVersion(); - long lastSuccessVersionHash = replica.getLastSuccessVersionHash(); - if (!errorReplicaIds.contains(replica.getId())) { - if (replica.getLastFailedVersion() > 0) { - // if the replica is a failed replica, then not changing version and version hash - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), - partition.getVisibleVersionHash(), true)) { - // this means the replica has error in the past, but we did not observe it - // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica - // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback - // then we will detect this and set C's last failed version to 10 and last success version to 11 - // this logic has to be replayed in checkpoint thread - lastFailedVersion = partition.getVisibleVersion(); - lastFailedVersionHash = partition.getVisibleVersionHash(); - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - } - // success version always move forward - lastSucessVersion = newCommitVersion; - lastSuccessVersionHash = newCommitVersionHash; - } else { - // for example, A,B,C 3 replicas, B,C failed during publish version, then B C will be set abnormal - // all loading will failed, B,C will have to recovery by clone, it is very inefficient and maybe lost data - // Using this method, B,C will publish failed, and fe will publish again, not update their last failed version - // if B is publish successfully in next turn, then B is normal and C will be set abnormal so that quorum is maintained - // and loading will go on. - newVersion = replica.getVersion(); - newVersionHash = replica.getVersionHash(); - if (newCommitVersion > lastFailedVersion) { - lastFailedVersion = newCommitVersion; - lastFailedVersionHash = newCommitVersionHash; - } - } - replica.updateVersionInfo(newVersion, newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion, lastSuccessVersionHash); - } - } - } // end for indices - long version = partitionCommitInfo.getVersion(); - long versionHash = partitionCommitInfo.getVersionHash(); - partition.updateVisibleVersionAndVersionHash(version, versionHash); - if (LOG.isDebugEnabled()) { - LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]", - transactionState, partition.getId(), version, versionHash); - } - } - } - return true; - } - - private void updateTxnLabels(TransactionState transactionState) { - Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel()); - if (txnIds == null) { - txnIds = Sets.newHashSet(); - dbIdToTxnLabels.put(transactionState.getDbId(), transactionState.getLabel(), txnIds); - } - txnIds.add(transactionState.getTransactionId()); - } - - private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) { - Map<Long, Integer> txnNumMap = null; - if (curTxnState.getSourceType() == LoadJobSourceType.ROUTINE_LOAD_TASK) { - txnNumMap = runningRoutineLoadTxnNums; - } else { - txnNumMap = runningTxnNums; - } - - int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0); - if (preStatus == null - && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE - || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) { - ++txnNum; - } else if ((preStatus == TransactionStatus.PREPARE - || preStatus == TransactionStatus.COMMITTED) - && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE - || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) { - --txnNum; - } - - if (txnNum < 1) { - txnNumMap.remove(curTxnState.getDbId()); - } else { - txnNumMap.put(curTxnState.getDbId(), txnNum); - } - } - public List<List<Comparable>> getDbInfo() { List<List<Comparable>> infos = new ArrayList<List<Comparable>>(); - readLock(); - try { - Set<Long> dbIds = new HashSet<>(); - for (TransactionState transactionState : idToTransactionState.values()) { - dbIds.add(transactionState.getDbId()); - } - for (long dbId : dbIds) { - List<Comparable> info = new ArrayList<Comparable>(); - info.add(dbId); - Database db = Catalog.getInstance().getDb(dbId); - if (db == null) { - continue; - } - info.add(db.getFullName()); - infos.add(info); + List<Long> dbIds = Lists.newArrayList(); + for (Long dbId : dbIdToDatabaseTransactionMgrs.keySet()) { + dbIds.add(dbId); + } + + for (long dbId : dbIds) { + List<Comparable> info = new ArrayList<Comparable>(); + info.add(dbId); + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + continue; } - } finally { - readUnlock(); + info.add(db.getFullName()); + infos.add(info); } return infos; } public List<List<String>> getDbTransStateInfo(long dbId) { - List<List<String>> infos = Lists.newArrayList(); - readLock(); try { - infos.add(Lists.newArrayList("running", String.valueOf( - runningTxnNums.getOrDefault(dbId, 0) + runningRoutineLoadTxnNums.getOrDefault(dbId, 0)))); - long finishedNum = idToTransactionState.values().stream().filter( - t -> (t.getDbId() == dbId && t.getTransactionStatus().isFinalStatus())).count(); - infos.add(Lists.newArrayList("finished", String.valueOf(finishedNum))); - } finally { - readUnlock(); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getDbTransStateInfo(); + } catch (AnalysisException e) { + LOG.warn("Get db transaction state info failed", e); + return Lists.newArrayList(); } - return infos; } public List<List<String>> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException { - List<List<String>> infos = new ArrayList<>(); - readLock(); - try { - Database db = Catalog.getInstance().getDb(dbId); - if (db == null) { - throw new AnalysisException("Database[" + dbId + "] does not exist"); - } - - // get transaction order by txn id desc limit 'limit' - idToTransactionState.values().stream() - .filter(t -> (t.getDbId() == dbId && (running != t.getTransactionStatus().isFinalStatus()))) - .sorted(TransactionState.TXN_ID_COMPARATOR) - .limit(limit) - .forEach(t -> { - List<String> info = Lists.newArrayList(); - getTxnStateInfo(t, info); - infos.add(info); - }); - } finally { - readUnlock(); - } - return infos; + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getTxnStateInfoList(running, limit); } // get show info of a specified txnId public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws AnalysisException { - List<List<String>> infos = new ArrayList<List<String>>(); - readLock(); - try { - Database db = Catalog.getInstance().getDb(dbId); - if (db == null) { - throw new AnalysisException("Database[" + dbId + "] does not exist"); - } - - TransactionState txnState = idToTransactionState.get(txnId); - if (txnState == null) { - throw new AnalysisException("transaction with id " + txnId + " does not exist"); - } - - if (ConnectContext.get() != null) { - // check auth - Set<Long> tblIds = txnState.getIdToTableCommitInfos().keySet(); - for (Long tblId : tblIds) { - Table tbl = db.getTable(tblId); - if (tbl != null) { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), db.getFullName(), - tbl.getName(), PrivPredicate.SHOW)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, - "SHOW TRANSACTION", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - tbl.getName()); - } - } - } - } - - List<String> info = Lists.newArrayList(); - getTxnStateInfo(txnState, info); - infos.add(info); - } finally { - readUnlock(); - } - return infos; + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getSingleTranInfo(dbId, txnId); } - - private void getTxnStateInfo(TransactionState txnState, List<String> info) { - info.add(String.valueOf(txnState.getTransactionId())); - info.add(txnState.getLabel()); - info.add(txnState.getCoordinator().toString()); - info.add(txnState.getTransactionStatus().name()); - info.add(txnState.getSourceType().name()); - info.add(TimeUtils.longToTimeString(txnState.getPrepareTime())); - info.add(TimeUtils.longToTimeString(txnState.getCommitTime())); - info.add(TimeUtils.longToTimeString(txnState.getFinishTime())); - info.add(txnState.getReason()); - info.add(String.valueOf(txnState.getErrorReplicas().size())); - info.add(String.valueOf(txnState.getCallbackId())); - info.add(String.valueOf(txnState.getTimeoutMs())); - } - - public List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException { - List<List<Comparable>> tableInfos = new ArrayList<>(); - readLock(); - try { - TransactionState transactionState = idToTransactionState.get(txnId); - if (null == transactionState) { - throw new AnalysisException("Transaction[" + txnId + "] does not exist."); - } - for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) { - List<Comparable> tableInfo = new ArrayList<>(); - tableInfo.add(entry.getKey()); - tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map( - PartitionCommitInfo::getPartitionId).collect(Collectors.toList()))); - tableInfos.add(tableInfo); - } - } finally { - readUnlock(); - } - return tableInfos; + public List<List<Comparable>> getTableTransInfo(long dbId, long txnId) throws AnalysisException { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getTableTransInfo(txnId); } - public List<List<Comparable>> getPartitionTransInfo(long tid, long tableId) + public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid, long tableId) throws AnalysisException { - List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>(); - readLock(); - try { - TransactionState transactionState = idToTransactionState.get(tid); - if (null == transactionState) { - throw new AnalysisException("Transaction[" + tid + "] does not exist."); - } - TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId); - Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo(); - for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) { - List<Comparable> partitionInfo = new ArrayList<Comparable>(); - partitionInfo.add(entry.getKey()); - partitionInfo.add(entry.getValue().getVersion()); - partitionInfo.add(entry.getValue().getVersionHash()); - partitionInfos.add(partitionInfo); - } - } finally { - readUnlock(); - } - return partitionInfos; + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId); + return dbTransactionMgr.getPartitionTransInfo(tid, tableId); } - + + /** + * It is a non thread safe method, only invoked by checkpoint thread without any lock or image dump thread with db lock + */ public int getTransactionNum() { - return this.idToTransactionState.size(); + int txnNum = 0; + for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { + txnNum = txnNum + dbTransactionMgr.getTransactionNum(); Review comment: ```suggestion txnNum += dbTransactionMgr.getTransactionNum(); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org