This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 32c4fc6 Support determine isPreviousLoadFinished for some alter jobs in table level (#3196) 32c4fc6 is described below commit 32c4fc691cd79ccf47023ed02160485b1906579b Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Fri Mar 27 07:16:23 2020 -0500 Support determine isPreviousLoadFinished for some alter jobs in table level (#3196) This PR is to reduce the time cost for waiting transactions to be completed in same db by filter the running transactions in table level. NOTICE: Update FE meta version to 79 --- .../main/java/org/apache/doris/alter/AlterJob.java | 3 +- .../java/org/apache/doris/alter/RollupJobV2.java | 2 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +- .../java/org/apache/doris/analysis/InsertStmt.java | 3 +- .../org/apache/doris/clone/TabletScheduler.java | 2 +- .../org/apache/doris/common/FeMetaVersion.java | 5 ++- fe/src/main/java/org/apache/doris/load/Load.java | 3 +- .../main/java/org/apache/doris/load/LoadJob.java | 6 +++ .../apache/doris/load/loadv2/BrokerLoadJob.java | 3 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 1 - .../org/apache/doris/load/loadv2/LoadManager.java | 4 +- .../org/apache/doris/load/loadv2/MiniLoadJob.java | 8 +++- .../load/routineload/RoutineLoadTaskInfo.java | 3 +- .../apache/doris/service/FrontendServiceImpl.java | 3 +- .../org/apache/doris/task/LoadPendingTask.java | 4 +- .../doris/transaction/GlobalTransactionMgr.java | 51 ++++++++++++++-------- .../apache/doris/transaction/TransactionState.java | 27 +++++++++++- .../org/apache/doris/load/loadv2/LoadJobTest.java | 3 +- .../transaction/GlobalTransactionMgrTest.java | 24 +++++----- 19 files changed, 108 insertions(+), 49 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java index d0aaaa4..ab927cc 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java @@ -17,6 +17,7 @@ package org.apache.doris.alter; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -291,7 +292,7 @@ public abstract class AlterJob implements Writable { return true; } else { isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( - transactionId, dbId); + transactionId, dbId, Lists.newArrayList(tableId)); return isPreviousLoadFinished; } } diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index 031483c..877d955 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -498,7 +498,7 @@ public class RollupJobV2 extends AlterJobV2 { // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() { - return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId); + return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } public static RollupJobV2 read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 96025fd..e18e6f9 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -617,7 +617,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() { - return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId); + return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } public static SchemaChangeJobV2 read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index 2351cb1..50b94e2 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -292,7 +292,8 @@ public class InsertStmt extends DdlStmt { if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), - label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond); + Lists.newArrayList(targetTable.getId()), label, "FE: " + FrontendOptions.getLocalHostAddress(), + sourceType, timeoutSecond); } isTransactionBegin = true; } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 0e3e194..036fe6e 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -880,7 +880,7 @@ public class TabletScheduler extends MasterDaemon { } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) { long watermarkTxnId = replica.getWatermarkTxnId(); if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, - tabletCtx.getDbId())) { + tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) { throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished"); } } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index ed32274..19b2f63 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -167,6 +167,9 @@ public final class FeMetaVersion { public static final int VERSION_77 = 77; // plugin support public static final int VERSION_78 = 78; + // for transaction state in table level + public static final int VERSION_79 = 79; + // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_78; + public static final int VERSION_CURRENT = VERSION_79; } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 2990a09..8b5e8b8 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -3326,7 +3326,8 @@ public class Load { } loadDeleteJob.setIdToTabletLoadInfo(idToTabletLoadInfo); loadDeleteJob.setState(JobState.LOADING); - long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel, + long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), + Lists.newArrayList(table.getId()), jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); loadDeleteJob.setTransactionId(transactionId); diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java index 450dfdb..ef0e26c 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java @@ -78,6 +78,7 @@ public class LoadJob implements Writable { private long id; private long dbId; + private long tableId; private String label; // when this job is a real time load job, the job is attach with a transaction private long transactionId = -1; @@ -144,6 +145,7 @@ public class LoadJob implements Writable { DeleteInfo deleteInfo) { this.id = id; this.dbId = dbId; + this.tableId = tableId; this.label = label; this.transactionId = -1; this.timestamp = -1; @@ -243,6 +245,10 @@ public class LoadJob implements Writable { return dbId; } + public long getTableId() { + return tableId; + } + public void setDbId(long dbId) { this.dbId = dbId; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 62c10b5..eb7938e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -208,7 +208,8 @@ public class BrokerLoadJob extends LoadJob { public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(), + .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, + "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, timeoutSecond); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 8f68309..08efaeb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -222,7 +222,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public long getDbId() { return dbId; } - public String getLabel() { return label; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 4490a1b..5104094 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -137,11 +138,12 @@ public class LoadManager implements Writable{ cluster = request.getCluster(); } Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb())); + Table table = database.getTable(request.tbl); checkTable(database, request.getTbl()); LoadJob loadJob = null; writeLock(); try { - loadJob = new MiniLoadJob(database.getId(), request); + loadJob = new MiniLoadJob(database.getId(), table.getId(), request); // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add. // NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn(). // for other kind of load job, execute the job after adding job. diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 94d4707..68def55 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import com.google.common.collect.Lists; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -47,14 +48,17 @@ public class MiniLoadJob extends LoadJob { private String tableName; + private long tableId; + // only for log replay public MiniLoadJob() { super(); this.jobType = EtlJobType.MINI; } - public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFoundException { + public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException { super(dbId, request.getLabel()); + this.tableId = tableId; this.jobType = EtlJobType.MINI; this.tableName = request.getTbl(); if (request.isSetTimeout_second()) { @@ -93,7 +97,7 @@ public class MiniLoadJob extends LoadJob { public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(), + .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, timeoutSecond); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 3af9fb4..3a7af6d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -164,7 +164,8 @@ public abstract class RoutineLoadTaskInfo { RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); try { txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(), + routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, + "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), timeoutMs / 1000); } catch (DuplicatedRequestException e) { diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9a6075f..78b2138 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -670,6 +670,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { Catalog catalog = Catalog.getInstance(); String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); Database db = catalog.getDb(fullDbName); + Table table = db.getTable(request.tbl); if (db == null) { String dbName = fullDbName; if (Strings.isNullOrEmpty(request.getCluster())) { @@ -681,7 +682,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; return Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp, + db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequest_id(), "BE: " + clientIp, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); } diff --git a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java index ef13da8..b9fcc87 100644 --- a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java @@ -17,6 +17,7 @@ package org.apache.doris.task; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.util.DebugUtil; @@ -67,6 +68,7 @@ public abstract class LoadPendingTask extends MasterTask { // get db long dbId = job.getDbId(); + long tableId = job.getTableId(); db = Catalog.getInstance().getDb(dbId); if (db == null) { load.cancelLoadJob(job, CancelType.ETL_SUBMIT_FAIL, "db does not exist. id: " + dbId); @@ -78,7 +80,7 @@ public abstract class LoadPendingTask extends MasterTask { // create etl request and make some guarantee for schema change and rollup if (job.getTransactionId() < 0) { long transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, DebugUtil.printId(UUID.randomUUID()), + .beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()), "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND, job.getTimeoutSecond()); job.setTransactionId(transactionId); diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 2279ffc..e201f17 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -17,6 +17,7 @@ package org.apache.doris.transaction; +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -121,10 +122,10 @@ public class GlobalTransactionMgr implements Writable { return callbackFactory; } - public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType, + public long beginTransaction(long dbId, List<Long> tableIdList, String label, String coordinator, LoadJobSourceType sourceType, long timeoutSecond) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException { - return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond); + return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond); } /** @@ -140,7 +141,7 @@ public class GlobalTransactionMgr implements Writable { * @throws DuplicatedRequestException * @throws IllegalTransactionParameterException */ - public long beginTransaction(long dbId, String label, TUniqueId requestId, + public long beginTransaction(long dbId, List<Long> tableIdList, String label, TUniqueId requestId, String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException { @@ -196,7 +197,7 @@ public class GlobalTransactionMgr implements Writable { long tid = idGenerator.getNextTransactionId(); LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator); - TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType, + TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); unprotectUpsertTransactionState(transactionState); @@ -802,25 +803,37 @@ public class GlobalTransactionMgr implements Writable { } // check if there exists a load job before the endTransactionId have all finished - // load job maybe started but could not know the affected table id, so that we not check by table - public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId) { - readLock(); - try { - for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) { - if (entry.getValue().getDbId() != dbId || !entry.getValue().isRunning()) { - continue; - } - if (entry.getKey() <= endTransactionId) { - LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}", - entry.getKey(), dbId, endTransactionId); - return false; - } + public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList) { + for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) { + if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(), + tableIdList) || !entry.getValue().isRunning()) { + continue; + } + if (entry.getKey() <= endTransactionId) { + LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}", + entry.getKey(), dbId, endTransactionId); + return false; } - } finally { - readUnlock(); } return true; } + + // check if there exists a intersection between the source tableId list and target tableId list + // if one of them is null or empty, that means that we don't know related tables in tableList, + // we think the two lists may have intersection for right ordered txns + public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> targetTableIdList) { + if (CollectionUtils.isEmpty(sourceTableIdList) || CollectionUtils.isEmpty(targetTableIdList)) { + return true; + } + for (int i = 0; i < sourceTableIdList.size(); i++) { + for (int j = 0; j < targetTableIdList.size(); j++) { + if (sourceTableIdList.get(i).equals(targetTableIdList.get(j))) { + return true; + } + } + } + return false; + } /* * The txn cleaner will run at a fixed interval and try to delete expired and timeout txns: diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 0a967ac..e261f7f 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -17,6 +17,8 @@ package org.apache.doris.transaction; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; @@ -40,6 +42,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -126,6 +129,7 @@ public class TransactionState implements Writable { } private long dbId; + private List<Long> tableIdList; private long transactionId; private String label; // requsetId is used to judge whether a begin request is a internal retry request. @@ -168,6 +172,7 @@ public class TransactionState implements Writable { public TransactionState() { this.dbId = -1; + this.tableIdList = Lists.newArrayList(); this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); @@ -184,9 +189,10 @@ public class TransactionState implements Writable { this.latch = new CountDownLatch(1); } - public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId, + public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requsetId, LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) { this.dbId = dbId; + this.tableIdList = (tableIdList == null ? Lists.newArrayList() : tableIdList); this.transactionId = transactionId; this.label = label; this.requsetId = requsetId; @@ -408,7 +414,11 @@ public class TransactionState implements Writable { public long getDbId() { return dbId; } - + + public List<Long> getTableIdList() { + return tableIdList; + } + public Map<Long, TableCommitInfo> getIdToTableCommitInfos() { return idToTableCommitInfos; } @@ -467,6 +477,7 @@ public class TransactionState implements Writable { sb.append("transaction id: ").append(transactionId); sb.append(", label: ").append(label); sb.append(", db id: ").append(dbId); + sb.append(", table id list: ").append(StringUtils.join(tableIdList, ",")); sb.append(", callback id: ").append(callbackId); sb.append(", coordinator: ").append(coordinator); sb.append(", transaction status: ").append(transactionStatus); @@ -533,6 +544,10 @@ public class TransactionState implements Writable { } out.writeLong(callbackId); out.writeLong(timeoutMs); + out.writeInt(tableIdList.size()); + for (int i = 0; i < tableIdList.size(); i++) { + out.writeLong(tableIdList.get(i)); + } } public void readFields(DataInput in) throws IOException { @@ -564,5 +579,13 @@ public class TransactionState implements Writable { callbackId = in.readLong(); timeoutMs = in.readLong(); } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_79) { + tableIdList = Lists.newArrayList(); + int tableListSize = in.readInt(); + for (int i = 0; i < tableListSize; i++) { + tableIdList.add(in.readLong()); + } + } } } diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java index ec4533f..2feecfc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; +import com.google.common.collect.Lists; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; @@ -106,7 +107,7 @@ public class LoadJobTest { LoadJob loadJob = new BrokerLoadJob(); new Expectations() { { - globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString, + globalTransactionMgr.beginTransaction(anyLong, Lists.newArrayList(), anyString, (TUniqueId) any, anyString, (TransactionState.LoadJobSourceType) any, anyLong, anyLong); minTimes = 0; result = 1; diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index e3a5f3f..860553c 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -106,7 +106,7 @@ public class GlobalTransactionMgrTest { public void testBeginTransaction() throws LabelAlreadyUsedException, AnalysisException, BeginTransactionException, DuplicatedRequestException { FakeCatalog.setCatalog(masterCatalog); - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -124,7 +124,7 @@ public class GlobalTransactionMgrTest { FakeCatalog.setCatalog(masterCatalog); long transactionId = 0; try { - transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -141,7 +141,7 @@ public class GlobalTransactionMgrTest { assertEquals(transactionSource, transactionState.getCoordinator()); try { - transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -154,7 +154,7 @@ public class GlobalTransactionMgrTest { @Test public void testCommitTransaction1() throws UserException { FakeCatalog.setCatalog(masterCatalog); - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -195,7 +195,7 @@ public class GlobalTransactionMgrTest { public void testCommitTransactionWithOneFailed() throws UserException { TransactionState transactionState = null; FakeCatalog.setCatalog(masterCatalog); - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -217,7 +217,7 @@ public class GlobalTransactionMgrTest { FakeCatalog.setCatalog(masterCatalog); // commit another transaction with 1,3 success - long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable2, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -320,7 +320,7 @@ public class GlobalTransactionMgrTest { partitionIdToOffset); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", null, + TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); @@ -387,7 +387,7 @@ public class GlobalTransactionMgrTest { partitionIdToOffset); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); - TransactionState transactionState = new TransactionState(1L, 1L, "label", null, + TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); @@ -431,7 +431,7 @@ public class GlobalTransactionMgrTest { } public void testFinishTransaction() throws UserException { - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -477,7 +477,7 @@ public class GlobalTransactionMgrTest { .getPartition(CatalogTestUtil.testPartition1); Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1); FakeCatalog.setCatalog(masterCatalog); - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -531,7 +531,7 @@ public class GlobalTransactionMgrTest { FakeCatalog.setCatalog(masterCatalog); // commit another transaction with 1,3 success - long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable2, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); @@ -603,7 +603,7 @@ public class GlobalTransactionMgrTest { public void testDeleteTransaction() throws LabelAlreadyUsedException, AnalysisException, BeginTransactionException, DuplicatedRequestException { - long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, + long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLable1, transactionSource, LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org