This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 184ca99803 [enhancement](stream load pipe) using queryid or load id to identify stream load pipe instead of fragment instance id (#17439) 184ca99803 is described below commit 184ca99803b954bfc755732a1ee3de07a0729683 Author: yiguolei <676222...@qq.com> AuthorDate: Mon Mar 6 11:10:54 2023 +0800 [enhancement](stream load pipe) using queryid or load id to identify stream load pipe instead of fragment instance id (#17439) cherry-pick part of #17362 This PR does not affect load behavior, just add load id to Protobuf Message. So user could upgrade to 2.x from 1.2.3 smoothly. --- .../org/apache/doris/qe/InsertStreamTxnExecutor.java | 16 +++++++++++++--- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 +++--- .../java/org/apache/doris/rpc/BackendServiceProxy.java | 14 +++++++++----- .../org/apache/doris/transaction/TransactionEntry.java | 10 ++++++++++ .../apache/doris/load/sync/canal/CanalSyncDataTest.java | 13 +++++++------ gensrc/proto/internal_service.proto | 3 +++ 6 files changed, 45 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 033d013ecf..cd50fa6feb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -63,9 +63,11 @@ public class InsertStreamTxnExecutor { public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { TTxnParams txnConf = txnEntry.getTxnConf(); + // StreamLoadTask's id == request's load_id StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); StreamLoadPlanner planner = new StreamLoadPlanner( txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); + // Will using load id as query id in fragment TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName()) .needLoadAvailable().needQueryAvailable().build(); @@ -90,6 +92,10 @@ public class InsertStreamTxnExecutor { } } txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); txnConf.setUserIp(backend.getHost()); @@ -117,11 +123,12 @@ public class InsertStreamTxnExecutor { .setHi(txnConf.getFragmentInstanceId().getHi()) .setLo(txnConf.getFragmentInstanceId().getLo()).build(); + Backend backend = txnEntry.getBackend(); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { Future<InternalService.PCommitResult> future = BackendServiceProxy - .getInstance().commit(address, fragmentInstanceId); + .getInstance().commit(address, fragmentInstanceId, this.txnEntry.getpLoadId()); InternalService.PCommitResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { @@ -143,7 +150,7 @@ public class InsertStreamTxnExecutor { TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); try { Future<InternalService.PRollbackResult> future = BackendServiceProxy.getInstance().rollback(address, - fragmentInstanceId); + fragmentInstanceId, this.txnEntry.getpLoadId()); InternalService.PRollbackResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { @@ -169,7 +176,7 @@ public class InsertStreamTxnExecutor { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { Future<InternalService.PSendDataResult> future = BackendServiceProxy.getInstance().sendData( - address, fragmentInstanceId, txnEntry.getDataToSend()); + address, fragmentInstanceId, this.txnEntry.getpLoadId(), txnEntry.getDataToSend()); InternalService.PSendDataResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { @@ -188,6 +195,9 @@ public class InsertStreamTxnExecutor { public void setLoadId(TUniqueId loadId) { this.loadId = loadId; + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); } public long getTxnId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 70d3fe0848..2f53b3ccb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -586,7 +586,7 @@ public class StmtExecutor implements ProfileWriter { throw e; } catch (UserException e) { // analysis exception only print message, not print the stack - LOG.warn("execute Exception. {}, {}", context.getQueryIdentifier(), e.getMessage()); + LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e); context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } catch (Exception e) { @@ -1253,8 +1253,7 @@ public class StmtExecutor implements ProfileWriter { if (context.getTxnEntry() == null) { context.setTxnEntry(new TransactionEntry()); } - TransactionEntry txnEntry = context.getTxnEntry(); - txnEntry.setTxnConf(txnParams); + context.getTxnEntry().setTxnConf(txnParams); StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") .append(TransactionStatus.PREPARE.name()); @@ -1300,6 +1299,7 @@ public class StmtExecutor implements ProfileWriter { .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}"); context.getState().setOk(0, 0, sb.toString()); } catch (Exception e) { + LOG.warn("Txn commit failed", e); throw new AnalysisException(e.getMessage()); } finally { context.setTxnEntry(null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index a7c2df6e28..5911558e02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -249,11 +249,13 @@ public class BackendServiceProxy { } public Future<InternalService.PSendDataResult> sendData( - TNetworkAddress address, Types.PUniqueId fragmentInstanceId, List<InternalService.PDataRow> data) + TNetworkAddress address, Types.PUniqueId fragmentInstanceId, + Types.PUniqueId loadId, List<InternalService.PDataRow> data) throws RpcException { final InternalService.PSendDataRequest.Builder pRequest = InternalService.PSendDataRequest.newBuilder(); pRequest.setFragmentInstanceId(fragmentInstanceId); + pRequest.setLoadId(loadId); pRequest.addAllData(data); try { final BackendServiceClient client = getProxy(address); @@ -264,10 +266,11 @@ public class BackendServiceProxy { } } - public Future<InternalService.PRollbackResult> rollback(TNetworkAddress address, Types.PUniqueId fragmentInstanceId) + public Future<InternalService.PRollbackResult> rollback(TNetworkAddress address, + Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId) throws RpcException { final InternalService.PRollbackRequest pRequest = InternalService.PRollbackRequest.newBuilder() - .setFragmentInstanceId(fragmentInstanceId).build(); + .setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build(); try { final BackendServiceClient client = getProxy(address); return client.rollback(pRequest); @@ -277,10 +280,11 @@ public class BackendServiceProxy { } } - public Future<InternalService.PCommitResult> commit(TNetworkAddress address, Types.PUniqueId fragmentInstanceId) + public Future<InternalService.PCommitResult> commit(TNetworkAddress address, + Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId) throws RpcException { final InternalService.PCommitRequest pRequest = InternalService.PCommitRequest.newBuilder() - .setFragmentInstanceId(fragmentInstanceId).build(); + .setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build(); try { final BackendServiceClient client = getProxy(address); return client.commit(pRequest); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 4db596dc55..7136871579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -20,6 +20,7 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TTxnParams; @@ -35,6 +36,7 @@ public class TransactionEntry { private TTxnParams txnConf; private List<InternalService.PDataRow> dataToSend = new ArrayList<>(); private long rowsInTransaction = 0; + private Types.PUniqueId pLoadId; public TransactionEntry() { } @@ -116,4 +118,12 @@ public class TransactionEntry { public void setRowsInTransaction(long rowsInTransaction) { this.rowsInTransaction = rowsInTransaction; } + + public Types.PUniqueId getpLoadId() { + return pLoadId; + } + + public void setpLoadId(Types.PUniqueId pLoadId) { + this.pLoadId = pLoadId; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 430d8d204e..a1f31b0b34 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -263,11 +263,11 @@ public class CanalSyncDataTest { minTimes = 0; result = execFuture; - backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any); + backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any, (Types.PUniqueId) any); minTimes = 0; result = commitFuture; - backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, + backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (Types.PUniqueId) any, (List<InternalService.PDataRow>) any); minTimes = 0; result = sendDataFuture; @@ -336,7 +336,7 @@ public class CanalSyncDataTest { minTimes = 0; result = execFuture; - backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any); + backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any, (Types.PUniqueId) any); minTimes = 0; result = abortFuture; @@ -403,15 +403,16 @@ public class CanalSyncDataTest { minTimes = 0; result = execFuture; - backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any); + backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any, (Types.PUniqueId) any); minTimes = 0; result = commitFuture; - backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any); + backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any, (Types.PUniqueId) any); minTimes = 0; result = abortFuture; - backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List<InternalService.PDataRow>) any); + backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, + (Types.PUniqueId) any, (List<InternalService.PDataRow>) any); minTimes = 0; result = sendDataFuture; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 0396e8944b..95e1887b26 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -351,6 +351,7 @@ message PDataRow { message PSendDataRequest { required PUniqueId fragment_instance_id = 1; repeated PDataRow data = 2; + optional PUniqueId load_id = 3; // load_id == query_id in fragment exec } message PSendDataResult { @@ -359,6 +360,7 @@ message PSendDataResult { message PCommitRequest { required PUniqueId fragment_instance_id = 1; + optional PUniqueId load_id = 2; } message PCommitResult { @@ -367,6 +369,7 @@ message PCommitResult { message PRollbackRequest { required PUniqueId fragment_instance_id = 1; + optional PUniqueId load_id = 2; } message PRollbackResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org