This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 187461e2fd7 [Fix](Export) Export delete multiple times when specify the `delete_existing_files` property () (#39304) 187461e2fd7 is described below commit 187461e2fd74db2ba61fe417e195489f9ca4a6ef Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Aug 13 22:26:02 2024 +0800 [Fix](Export) Export delete multiple times when specify the `delete_existing_files` property () (#39304) bp: #38400 When the `Export` statement specifies the `delete_existing_files` property, each `Outfile` statement generated by the `Export` will carry this property. This causes each `Outfile` statement to delete existing files, so only the result of the last Outfile statement will be retained. So, we add a rpc method which can delete existing files for `Export` statement and the `Outfile` statements generated by the `Export` will not carry `delete_existing_files` property any more. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/service/internal_service.cpp | 6 +++--- .../org/apache/doris/common/util/BrokerUtil.java | 12 ++++++++++- .../main/java/org/apache/doris/load/ExportJob.java | 4 +--- .../main/java/org/apache/doris/load/ExportMgr.java | 10 ++++++++++ .../doris/load/loadv2/SparkEtlJobHandler.java | 2 +- .../apache/doris/load/loadv2/SparkRepository.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 23 +++++----------------- .../apache/doris/common/util/BrokerUtilTest.java | 2 +- .../doris/load/loadv2/SparkEtlJobHandlerTest.java | 2 +- gensrc/proto/internal_service.proto | 3 --- 10 files changed, 34 insertions(+), 32 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3be1d2eefdf..29d9e9ad363 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -678,7 +678,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController uint32_t len = request->result_file_sink().size(); st = deserialize_thrift_msg(buf, &len, false, &result_file_sink); if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success file failed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } @@ -697,7 +697,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController bool exists = true; st = io::global_local_filesystem()->exists(file_name, &exists); if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success filefailed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } @@ -705,7 +705,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController st = Status::InternalError("File already exists: {}", file_name); } if (!st.ok()) { - LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + LOG(WARNING) << "outfile write success file failed, errmsg = " << st; st.to_protobuf(result->mutable_status()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index d32a04331b5..c5a2803b848 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -107,6 +107,16 @@ public class BrokerUtil { } } + public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException { + RemoteFileSystem fileSystem = FileSystemFactory.get( + brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties()); + Status st = fileSystem.deleteDirectory(path); + if (!st.ok()) { + throw new UserException(brokerDesc.getName() + " delete directory exception. path=" + + path + ", err: " + st.getErrMsg()); + } + } + public static String printBroker(String brokerName, TNetworkAddress address) { return brokerName + "[" + address.toString() + "]"; } @@ -358,7 +368,7 @@ public class BrokerUtil { * @param brokerDesc * @throws UserException if broker op failed */ - public static void deletePath(String path, BrokerDesc brokerDesc) throws UserException { + public static void deletePathWithBroker(String path, BrokerDesc brokerDesc) throws UserException { TNetworkAddress address = getAddress(brokerDesc); TPaloBrokerService.Client client = borrowClient(address); boolean failed = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 4b5f1087b04..303887875eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -634,9 +634,7 @@ public class ExportJob implements Writable { if (!maxFileSize.isEmpty()) { outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize); } - if (!deleteExistingFiles.isEmpty()) { - outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles); - } + outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom); // broker properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 7c2351fba5a..7439fd89aa4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; @@ -103,6 +104,15 @@ public class ExportMgr { throw new LabelAlreadyUsedException(job.getLabel()); } unprotectAddJob(job); + // delete existing files + if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { + if (job.getBrokerDesc() == null) { + throw new AnalysisException("Local file system does not support delete existing files"); + } + String fullPath = job.getExportPath(); + BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), + job.getBrokerDesc()); + } job.getTaskExecutors().forEach(executor -> { Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); job.getTaskIdToExecutor().put(taskId, executor); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 75e3a6e1718..69a41bd1283 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -365,7 +365,7 @@ public class SparkEtlJobHandler { public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { try { - BrokerUtil.deletePath(outputPath, brokerDesc); + BrokerUtil.deletePathWithBroker(outputPath, brokerDesc); LOG.info("delete path success. path: {}", outputPath); } catch (UserException e) { LOG.warn("delete path failed. path: {}", outputPath, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 4efd2d17279..19b21ff11fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -166,7 +166,7 @@ public class SparkRepository { try { String remoteArchivePath = getRemoteArchivePath(currentDppVersion); if (isReplace) { - BrokerUtil.deletePath(remoteArchivePath, brokerDesc); + BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc); currentArchive.libraries.clear(); } String srcFilePath = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f1ccfb0563e..2cdb3313294 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -153,16 +153,12 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; -import org.apache.doris.planner.DataSink; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; -import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; @@ -172,7 +168,6 @@ import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; -import org.apache.doris.qe.Coordinator.FragmentExecParams; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; @@ -1892,26 +1887,18 @@ public class StmtExecutor { TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); // 2. set brokerNetAddress - List<PlanFragment> fragments = coord.getFragments(); - Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = coord.getFragmentExecParamsMap(); - PlanFragmentId topId = fragments.get(0).getFragmentId(); - FragmentExecParams topParams = fragmentExecParamsMap.get(topId); - DataSink topDataSink = topParams.fragment.getSink(); - TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; - if (topDataSink instanceof ResultFileSink - && ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) { + StorageType storageType = outFileClause.getBrokerDesc() == null + ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); + if (storageType == StorageType.BROKER) { // set the broker address for OUTFILE sink - ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; - FsBroker broker = Env.getCurrentEnv().getBrokerMgr() - .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); + String brokerName = outFileClause.getBrokerDesc().getName(); + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName); sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port))); } // 3. set TResultFileSink properties TResultFileSink sink = new TResultFileSink(); sink.setFileOptions(sinkOptions); - StorageType storageType = outFileClause.getBrokerDesc() == null - ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); sink.setStorageBackendType(storageType.toThrift()); // 4. get BE diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java index ceae283a2fd..13bbd66fab1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -318,7 +318,7 @@ public class BrokerUtilTest { try { BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); - BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); + BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); } catch (Exception e) { Assert.fail(e.getMessage()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index 5ecfa2e2d64..d1b6c786441 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -418,7 +418,7 @@ public class SparkEtlJobHandlerTest { public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws UserException { new Expectations() { { - BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any); + BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc) any); times = 1; } }; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9b2f106a1f4..1d68d60aa61 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -688,9 +688,6 @@ message PFetchTableSchemaResult { } message POutfileWriteSuccessRequest { - // optional string file_path = 1; - // optional string success_file_name = 2; - // map<string, string> broker_properties = 4; // only for remote file optional bytes result_file_sink = 1; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org