BePPPower commented on code in PR #18325: URL: https://github.com/apache/doris/pull/18325#discussion_r1165096930
########## fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java: ########## @@ -281,67 +199,78 @@ private void registerProfile() { ProfileManager.getInstance().pushProfile(profile); } - private Status moveTmpFiles() { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Env.getCurrentEnv().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP); - } catch (AnalysisException e) { - String failMsg = "get broker failed. export job: " + job.getId() + ". msg: " + e.getMessage(); - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); + private void handlePendingState() { + long dbId = job.getDbId(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); + return; } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - String failMsg = "create connection to broker(" + address + ") failed"; - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); - } + + if (job.isReplayed()) { + // If the job is created from replay thread, all plan info will be lost. + // so the job has to be cancelled. + String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + return; + } + + // make snapshots + Status snapshotStatus = makeSnapshots(); + if (!snapshotStatus.ok()) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); + return; + } + + if (job.updateState(ExportJob.JobState.EXPORTING)) { + LOG.info("Exchange pending status to exporting status success. job: {}", job); + return; } - boolean failed = false; - Set<String> exportedFiles = job.getExportedFiles(); - List<String> newFiles = Lists.newArrayList(); - String exportPath = job.getExportPath(); - for (String exportedFile : exportedFiles) { - // move exportPath/__doris_tmp/file to exportPath/file - String file = exportedFile.substring(exportedFile.lastIndexOf("/") + 1); - String destPath = exportPath + "/" + file; - LOG.debug("rename {} to {}, export job: {}", exportedFile, destPath, job.getId()); - String failMsg = ""; - try { - TBrokerRenamePathRequest request = new TBrokerRenamePathRequest( - TBrokerVersion.VERSION_ONE, exportedFile, destPath, job.getBrokerDesc().getProperties()); - TBrokerOperationStatus tBrokerOperationStatus = null; - tBrokerOperationStatus = client.renamePath(request); - if (tBrokerOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - failed = true; - failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath - + ", broker=" + address + ", msg=" + tBrokerOperationStatus.getMessage(); - return new Status(TStatusCode.CANCELLED, failMsg); - } else { - newFiles.add(destPath); + } + + private Status makeSnapshots() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org