dataroaring commented on code in PR #17881: URL: https://github.com/apache/doris/pull/17881#discussion_r1177295418
########## gensrc/thrift/FrontendService.thrift: ########## @@ -847,6 +913,10 @@ service FrontendService { TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request) TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request) + TBeginTxnResult beginTxn(1: TBeginTxnRequest request) Review Comment: Should we delete shouldTxnBegin in the future? beginTxn and loadTxnBegin are similare. ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -983,6 +998,95 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin return result; } + @Override + public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive txn begin request: {}, backend: {}", request, clientAddr); + + TBeginTxnResult result = new TBeginTxnResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr); + result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + } catch (DuplicatedRequestException e) { + // this is a duplicate request, just return previous txn id + LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), + e.getTxnId()); + result.setTxnId(e.getTxnId()); + } catch (LabelAlreadyUsedException e) { + status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS); + status.addToErrorMsgs(e.getMessage()); + result.setJobStatus(e.getJobStatus()); + } catch (UserException e) { + LOG.warn("failed to begin: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; Review Comment: We can remove this return stmt as above. ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -983,6 +998,95 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin return result; } + @Override + public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive txn begin request: {}, backend: {}", request, clientAddr); + + TBeginTxnResult result = new TBeginTxnResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr); + result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + } catch (DuplicatedRequestException e) { + // this is a duplicate request, just return previous txn id + LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), + e.getTxnId()); + result.setTxnId(e.getTxnId()); + } catch (LabelAlreadyUsedException e) { + status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS); + status.addToErrorMsgs(e.getMessage()); + result.setJobStatus(e.getJobStatus()); + } catch (UserException e) { + LOG.warn("failed to begin: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + return result; + } + + private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp) throws UserException { + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + // step 1: check auth + if (Strings.isNullOrEmpty(request.getToken())) { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTables(), + request.getUserIp(), PrivPredicate.LOAD); + } + + // step 2: check label + if (Strings.isNullOrEmpty(request.getLabel())) { + throw new UserException("empty label in begin request"); + } + + // step 3: check database + Env env = Env.getCurrentEnv(); + String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + // step 4: fetch all tableIds + // lookup tables && convert into tableIdList + List<Long> tableIdList = Lists.newArrayList(); + for (String tblName : request.getTables()) { + String fullTblName = ClusterNamespace.getFullName(cluster, tblName); + Table table = db.getTableOrMetaException(fullTblName, TableType.OLAP); + if (table == null) { + throw new UserException("unknown table, table=" + fullTblName); + } + tableIdList.add(table.getId()); + } + + // step 5: get timeout + long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + + // step 6: begin transaction + long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + db.getId(), tableIdList, request.getLabel(), request.getRequestId(), + new TxnCoordinator(TxnSourceType.BE, clientIp), + TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); + + // step 7: return result + TBeginTxnResult result = new TBeginTxnResult(); + result.setTxnId(txnId).setDbId(db.getId()); + return result; + } Review Comment: Very similar to loadTxnBegin. ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -983,6 +998,95 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin return result; } + @Override + public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive txn begin request: {}, backend: {}", request, clientAddr); Review Comment: maybe client is not a backend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org