This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 187461e2fd7 [Fix](Export) Export delete multiple times when specify 
the `delete_existing_files` property () (#39304)
187461e2fd7 is described below

commit 187461e2fd74db2ba61fe417e195489f9ca4a6ef
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Tue Aug 13 22:26:02 2024 +0800

    [Fix](Export) Export delete multiple times when specify the 
`delete_existing_files` property () (#39304)
    
    bp: #38400
    
    When the `Export` statement specifies the `delete_existing_files`
    property, each `Outfile` statement generated by the `Export` will carry
    this property. This causes each `Outfile` statement to delete existing
    files, so only the result of the last Outfile statement will be
    retained.
    
    So, we add a rpc method which can delete existing files for `Export`
    statement and the `Outfile` statements generated by the `Export` will
    not carry `delete_existing_files` property any more.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/service/internal_service.cpp                |  6 +++---
 .../org/apache/doris/common/util/BrokerUtil.java   | 12 ++++++++++-
 .../main/java/org/apache/doris/load/ExportJob.java |  4 +---
 .../main/java/org/apache/doris/load/ExportMgr.java | 10 ++++++++++
 .../doris/load/loadv2/SparkEtlJobHandler.java      |  2 +-
 .../apache/doris/load/loadv2/SparkRepository.java  |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 23 +++++-----------------
 .../apache/doris/common/util/BrokerUtilTest.java   |  2 +-
 .../doris/load/loadv2/SparkEtlJobHandlerTest.java  |  2 +-
 gensrc/proto/internal_service.proto                |  3 ---
 10 files changed, 34 insertions(+), 32 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 3be1d2eefdf..29d9e9ad363 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -678,7 +678,7 @@ void 
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
             uint32_t len = request->result_file_sink().size();
             st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
             if (!st.ok()) {
-                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                LOG(WARNING) << "outfile write success file failed, errmsg = " 
<< st;
                 st.to_protobuf(result->mutable_status());
                 return;
             }
@@ -697,7 +697,7 @@ void 
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
             bool exists = true;
             st = io::global_local_filesystem()->exists(file_name, &exists);
             if (!st.ok()) {
-                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                LOG(WARNING) << "outfile write success filefailed, errmsg = " 
<< st;
                 st.to_protobuf(result->mutable_status());
                 return;
             }
@@ -705,7 +705,7 @@ void 
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
                 st = Status::InternalError("File already exists: {}", 
file_name);
             }
             if (!st.ok()) {
-                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                LOG(WARNING) << "outfile write success file failed, errmsg = " 
<< st;
                 st.to_protobuf(result->mutable_status());
                 return;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index d32a04331b5..c5a2803b848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -107,6 +107,16 @@ public class BrokerUtil {
         }
     }
 
+    public static void deleteDirectoryWithFileSystem(String path, BrokerDesc 
brokerDesc) throws UserException {
+        RemoteFileSystem fileSystem = FileSystemFactory.get(
+                brokerDesc.getName(), brokerDesc.getStorageType(), 
brokerDesc.getProperties());
+        Status st = fileSystem.deleteDirectory(path);
+        if (!st.ok()) {
+            throw new UserException(brokerDesc.getName() +  " delete directory 
exception. path="
+                    + path + ", err: " + st.getErrMsg());
+        }
+    }
+
     public static String printBroker(String brokerName, TNetworkAddress 
address) {
         return brokerName + "[" + address.toString() + "]";
     }
@@ -358,7 +368,7 @@ public class BrokerUtil {
      * @param brokerDesc
      * @throws UserException if broker op failed
      */
-    public static void deletePath(String path, BrokerDesc brokerDesc) throws 
UserException {
+    public static void deletePathWithBroker(String path, BrokerDesc 
brokerDesc) throws UserException {
         TNetworkAddress address = getAddress(brokerDesc);
         TPaloBrokerService.Client client = borrowClient(address);
         boolean failed = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 4b5f1087b04..303887875eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -634,9 +634,7 @@ public class ExportJob implements Writable {
         if (!maxFileSize.isEmpty()) {
             outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, 
maxFileSize);
         }
-        if (!deleteExistingFiles.isEmpty()) {
-            outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, 
deleteExistingFiles);
-        }
+
         outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);
 
         // broker properties
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 7c2351fba5a..7439fd89aa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.OrderByPair;
 import org.apache.doris.common.util.TimeUtils;
@@ -103,6 +104,15 @@ public class ExportMgr {
                 throw new LabelAlreadyUsedException(job.getLabel());
             }
             unprotectAddJob(job);
+            // delete existing files
+            if (Config.enable_delete_existing_files && 
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
+                if (job.getBrokerDesc() == null) {
+                    throw new AnalysisException("Local file system does not 
support delete existing files");
+                }
+                String fullPath = job.getExportPath();
+                BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
+                        job.getBrokerDesc());
+            }
             job.getTaskExecutors().forEach(executor -> {
                 Long taskId = 
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
                 job.getTaskIdToExecutor().put(taskId, executor);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 75e3a6e1718..69a41bd1283 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -365,7 +365,7 @@ public class SparkEtlJobHandler {
 
     public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
         try {
-            BrokerUtil.deletePath(outputPath, brokerDesc);
+            BrokerUtil.deletePathWithBroker(outputPath, brokerDesc);
             LOG.info("delete path success. path: {}", outputPath);
         } catch (UserException e) {
             LOG.warn("delete path failed. path: {}", outputPath, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
index 4efd2d17279..19b21ff11fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
@@ -166,7 +166,7 @@ public class SparkRepository {
         try {
             String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
             if (isReplace) {
-                BrokerUtil.deletePath(remoteArchivePath, brokerDesc);
+                BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc);
                 currentArchive.libraries.clear();
             }
             String srcFilePath = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f1ccfb0563e..2cdb3313294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -153,16 +153,12 @@ import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable
 import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
-import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.Planner;
-import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
@@ -172,7 +168,6 @@ import 
org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
 import org.apache.doris.qe.ConnectContext.ConnectType;
-import org.apache.doris.qe.Coordinator.FragmentExecParams;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
@@ -1892,26 +1887,18 @@ public class StmtExecutor {
         TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();
 
         // 2. set brokerNetAddress
-        List<PlanFragment> fragments = coord.getFragments();
-        Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = 
coord.getFragmentExecParamsMap();
-        PlanFragmentId topId = fragments.get(0).getFragmentId();
-        FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
-        DataSink topDataSink = topParams.fragment.getSink();
-        TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
-        if (topDataSink instanceof ResultFileSink
-                && ((ResultFileSink) topDataSink).getStorageType() == 
StorageBackend.StorageType.BROKER) {
+        StorageType storageType = outFileClause.getBrokerDesc() == null
+                ? StorageBackend.StorageType.LOCAL : 
outFileClause.getBrokerDesc().getStorageType();
+        if (storageType == StorageType.BROKER) {
             // set the broker address for OUTFILE sink
-            ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
-            FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
-                    .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
+            String brokerName = outFileClause.getBrokerDesc().getName();
+            FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
             sinkOptions.setBrokerAddresses(Lists.newArrayList(new 
TNetworkAddress(broker.host, broker.port)));
         }
 
         // 3. set TResultFileSink properties
         TResultFileSink sink = new TResultFileSink();
         sink.setFileOptions(sinkOptions);
-        StorageType storageType = outFileClause.getBrokerDesc() == null
-                ? StorageBackend.StorageType.LOCAL : 
outFileClause.getBrokerDesc().getStorageType();
         sink.setStorageBackendType(storageType.toThrift());
 
         // 4. get BE
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
index ceae283a2fd..13bbd66fab1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
@@ -318,7 +318,7 @@ public class BrokerUtilTest {
 
         try {
             BrokerDesc brokerDesc = new BrokerDesc("broker0", 
Maps.newHashMap());
-            
BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", 
brokerDesc);
+            
BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", 
brokerDesc);
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 5ecfa2e2d64..d1b6c786441 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -418,7 +418,7 @@ public class SparkEtlJobHandlerTest {
     public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws 
UserException {
         new Expectations() {
             {
-                BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
+                BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc) 
any);
                 times = 1;
             }
         };
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 9b2f106a1f4..1d68d60aa61 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -688,9 +688,6 @@ message PFetchTableSchemaResult {
 }
 
 message POutfileWriteSuccessRequest {
-    // optional string file_path = 1;
-    // optional string success_file_name = 2;
-    // map<string, string> broker_properties = 4; // only for remote file
     optional bytes result_file_sink = 1;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to