dataroaring commented on code in PR #17881:
URL: https://github.com/apache/doris/pull/17881#discussion_r1177295418


##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -847,6 +913,10 @@ service FrontendService {
     TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request)
     TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request)
 
+    TBeginTxnResult beginTxn(1: TBeginTxnRequest request)

Review Comment:
   Should we delete shouldTxnBegin in the future? beginTxn and loadTxnBegin are 
similare.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -983,6 +998,95 @@ private TLoadTxnBeginResult 
loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
         return result;
     }
 
+    @Override
+    public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive txn begin request: {}, backend: {}", request, 
clientAddr);
+
+        TBeginTxnResult result = new TBeginTxnResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
+            result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+        } catch (DuplicatedRequestException e) {
+            // this is a duplicate request, just return previous txn id
+            LOG.warn("duplicate request for stream load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(),
+                    e.getTxnId());
+            result.setTxnId(e.getTxnId());
+        } catch (LabelAlreadyUsedException e) {
+            status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS);
+            status.addToErrorMsgs(e.getMessage());
+            result.setJobStatus(e.getJobStatus());
+        } catch (UserException e) {
+            LOG.warn("failed to begin: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;

Review Comment:
   We can remove this return stmt as above.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -983,6 +998,95 @@ private TLoadTxnBeginResult 
loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
         return result;
     }
 
+    @Override
+    public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive txn begin request: {}, backend: {}", request, 
clientAddr);
+
+        TBeginTxnResult result = new TBeginTxnResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
+            result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+        } catch (DuplicatedRequestException e) {
+            // this is a duplicate request, just return previous txn id
+            LOG.warn("duplicate request for stream load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(),
+                    e.getTxnId());
+            result.setTxnId(e.getTxnId());
+        } catch (LabelAlreadyUsedException e) {
+            status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS);
+            status.addToErrorMsgs(e.getMessage());
+            result.setJobStatus(e.getJobStatus());
+        } catch (UserException e) {
+            LOG.warn("failed to begin: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+        return result;
+    }
+
+    private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String 
clientIp) throws UserException {
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        // step 1: check auth
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), request.getTables(),
+                    request.getUserIp(), PrivPredicate.LOAD);
+        }
+
+        // step 2: check label
+        if (Strings.isNullOrEmpty(request.getLabel())) {
+            throw new UserException("empty label in begin request");
+        }
+
+        // step 3: check database
+        Env env = Env.getCurrentEnv();
+        String fullDbName = ClusterNamespace.getFullName(cluster, 
request.getDb());
+        Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
+
+        // step 4: fetch all tableIds
+        // lookup tables && convert into tableIdList
+        List<Long> tableIdList = Lists.newArrayList();
+        for (String tblName : request.getTables()) {
+            String fullTblName = ClusterNamespace.getFullName(cluster, 
tblName);
+            Table table = db.getTableOrMetaException(fullTblName, 
TableType.OLAP);
+            if (table == null) {
+                throw new UserException("unknown table, table=" + fullTblName);
+            }
+            tableIdList.add(table.getId());
+        }
+
+        // step 5: get timeout
+        long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
+
+        // step 6: begin transaction
+        long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
+                db.getId(), tableIdList, request.getLabel(), 
request.getRequestId(),
+                new TxnCoordinator(TxnSourceType.BE, clientIp),
+                TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 
timeoutSecond);
+
+        // step 7: return result
+        TBeginTxnResult result = new TBeginTxnResult();
+        result.setTxnId(txnId).setDbId(db.getId());
+        return result;
+    }

Review Comment:
   Very similar to loadTxnBegin.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -983,6 +998,95 @@ private TLoadTxnBeginResult 
loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
         return result;
     }
 
+    @Override
+    public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive txn begin request: {}, backend: {}", request, 
clientAddr);

Review Comment:
   maybe client is not a backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to