This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 184ca99803 [enhancement](stream load pipe) using queryid or load id to 
identify stream load pipe instead of fragment instance id (#17439)
184ca99803 is described below

commit 184ca99803b954bfc755732a1ee3de07a0729683
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Mar 6 11:10:54 2023 +0800

    [enhancement](stream load pipe) using queryid or load id to identify stream 
load pipe instead of fragment instance id (#17439)
    
    cherry-pick part of #17362
    
    This PR does not affect load behavior, just add load id to Protobuf Message.
    So user could upgrade to 2.x from 1.2.3 smoothly.
---
 .../org/apache/doris/qe/InsertStreamTxnExecutor.java     | 16 +++++++++++++---
 .../src/main/java/org/apache/doris/qe/StmtExecutor.java  |  6 +++---
 .../java/org/apache/doris/rpc/BackendServiceProxy.java   | 14 +++++++++-----
 .../org/apache/doris/transaction/TransactionEntry.java   | 10 ++++++++++
 .../apache/doris/load/sync/canal/CanalSyncDataTest.java  | 13 +++++++------
 gensrc/proto/internal_service.proto                      |  3 +++
 6 files changed, 45 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 033d013ecf..cd50fa6feb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -63,9 +63,11 @@ public class InsertStreamTxnExecutor {
     public void beginTransaction(TStreamLoadPutRequest request) throws 
UserException, TException, TimeoutException,
             InterruptedException, ExecutionException {
         TTxnParams txnConf = txnEntry.getTxnConf();
+        // StreamLoadTask's id == request's load_id
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
         StreamLoadPlanner planner = new StreamLoadPlanner(
                 txnEntry.getDb(), (OlapTable) txnEntry.getTable(), 
streamLoadTask);
+        // Will using load id as query id in fragment
         TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
         BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
                 .needLoadAvailable().needQueryAvailable().build();
@@ -90,6 +92,10 @@ public class InsertStreamTxnExecutor {
             }
         }
         txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+        this.loadId = request.getLoadId();
+        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                .setHi(loadId.getHi())
+                .setLo(loadId.getLo()).build());
 
         Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
         txnConf.setUserIp(backend.getHost());
@@ -117,11 +123,12 @@ public class InsertStreamTxnExecutor {
                 .setHi(txnConf.getFragmentInstanceId().getHi())
                 .setLo(txnConf.getFragmentInstanceId().getLo()).build();
 
+
         Backend backend = txnEntry.getBackend();
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
         try {
             Future<InternalService.PCommitResult> future = BackendServiceProxy
-                    .getInstance().commit(address, fragmentInstanceId);
+                    .getInstance().commit(address, fragmentInstanceId, 
this.txnEntry.getpLoadId());
             InternalService.PCommitResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -143,7 +150,7 @@ public class InsertStreamTxnExecutor {
         TNetworkAddress address = new TNetworkAddress(be.getHost(), 
be.getBrpcPort());
         try {
             Future<InternalService.PRollbackResult> future = 
BackendServiceProxy.getInstance().rollback(address,
-                    fragmentInstanceId);
+                    fragmentInstanceId, this.txnEntry.getpLoadId());
             InternalService.PRollbackResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -169,7 +176,7 @@ public class InsertStreamTxnExecutor {
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
         try {
             Future<InternalService.PSendDataResult> future = 
BackendServiceProxy.getInstance().sendData(
-                    address, fragmentInstanceId, txnEntry.getDataToSend());
+                    address, fragmentInstanceId, this.txnEntry.getpLoadId(), 
txnEntry.getDataToSend());
             InternalService.PSendDataResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -188,6 +195,9 @@ public class InsertStreamTxnExecutor {
 
     public void setLoadId(TUniqueId loadId) {
         this.loadId = loadId;
+        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                .setHi(loadId.getHi())
+                .setLo(loadId.getLo()).build());
     }
 
     public long getTxnId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 70d3fe0848..2f53b3ccb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -586,7 +586,7 @@ public class StmtExecutor implements ProfileWriter {
             throw e;
         } catch (UserException e) {
             // analysis exception only print message, not print the stack
-            LOG.warn("execute Exception. {}, {}", 
context.getQueryIdentifier(), e.getMessage());
+            LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
             context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
             context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
         } catch (Exception e) {
@@ -1253,8 +1253,7 @@ public class StmtExecutor implements ProfileWriter {
             if (context.getTxnEntry() == null) {
                 context.setTxnEntry(new TransactionEntry());
             }
-            TransactionEntry txnEntry = context.getTxnEntry();
-            txnEntry.setTxnConf(txnParams);
+            context.getTxnEntry().setTxnConf(txnParams);
             StringBuilder sb = new StringBuilder();
             
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
                     .append(TransactionStatus.PREPARE.name());
@@ -1300,6 +1299,7 @@ public class StmtExecutor implements ProfileWriter {
                         
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
                 context.getState().setOk(0, 0, sb.toString());
             } catch (Exception e) {
+                LOG.warn("Txn commit failed", e);
                 throw new AnalysisException(e.getMessage());
             } finally {
                 context.setTxnEntry(null);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index a7c2df6e28..5911558e02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -249,11 +249,13 @@ public class BackendServiceProxy {
     }
 
     public Future<InternalService.PSendDataResult> sendData(
-            TNetworkAddress address, Types.PUniqueId fragmentInstanceId, 
List<InternalService.PDataRow> data)
+            TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
+            Types.PUniqueId loadId, List<InternalService.PDataRow> data)
             throws RpcException {
 
         final InternalService.PSendDataRequest.Builder pRequest = 
InternalService.PSendDataRequest.newBuilder();
         pRequest.setFragmentInstanceId(fragmentInstanceId);
+        pRequest.setLoadId(loadId);
         pRequest.addAllData(data);
         try {
             final BackendServiceClient client = getProxy(address);
@@ -264,10 +266,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PRollbackResult> rollback(TNetworkAddress 
address, Types.PUniqueId fragmentInstanceId)
+    public Future<InternalService.PRollbackResult> rollback(TNetworkAddress 
address,
+            Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
             throws RpcException {
         final InternalService.PRollbackRequest pRequest = 
InternalService.PRollbackRequest.newBuilder()
-                .setFragmentInstanceId(fragmentInstanceId).build();
+                
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.rollback(pRequest);
@@ -277,10 +280,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PCommitResult> commit(TNetworkAddress 
address, Types.PUniqueId fragmentInstanceId)
+    public Future<InternalService.PCommitResult> commit(TNetworkAddress 
address,
+            Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
             throws RpcException {
         final InternalService.PCommitRequest pRequest = 
InternalService.PCommitRequest.newBuilder()
-                .setFragmentInstanceId(fragmentInstanceId).build();
+                
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.commit(pRequest);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 4db596dc55..7136871579 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.transaction;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TTxnParams;
 
@@ -35,6 +36,7 @@ public class TransactionEntry {
     private TTxnParams txnConf;
     private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
     private long rowsInTransaction = 0;
+    private Types.PUniqueId pLoadId;
 
     public TransactionEntry() {
     }
@@ -116,4 +118,12 @@ public class TransactionEntry {
     public void setRowsInTransaction(long rowsInTransaction) {
         this.rowsInTransaction = rowsInTransaction;
     }
+
+    public Types.PUniqueId getpLoadId() {
+        return pLoadId;
+    }
+
+    public void setpLoadId(Types.PUniqueId pLoadId) {
+        this.pLoadId = pLoadId;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 430d8d204e..a1f31b0b34 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -263,11 +263,11 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any,
+                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any,
                         (List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
@@ -336,7 +336,7 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = abortFuture;
 
@@ -403,15 +403,16 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = abortFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any,
+                        (Types.PUniqueId) any, 
(List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 0396e8944b..95e1887b26 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -351,6 +351,7 @@ message PDataRow {
 message PSendDataRequest {
     required PUniqueId fragment_instance_id = 1;
     repeated PDataRow data = 2;
+    optional PUniqueId load_id = 3; // load_id == query_id in fragment exec
 }
 
 message PSendDataResult {
@@ -359,6 +360,7 @@ message PSendDataResult {
 
 message PCommitRequest {
     required PUniqueId fragment_instance_id = 1;
+    optional PUniqueId load_id = 2;
 }
 
 message PCommitResult {
@@ -367,6 +369,7 @@ message PCommitResult {
 
 message PRollbackRequest {
     required PUniqueId fragment_instance_id = 1;
+    optional PUniqueId load_id = 2;
 }
 
 message PRollbackResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to