This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 841f9cd  [Bug][SparkLoad] Divide the upload in spark repository into 
two steps (#4195)
841f9cd is described below

commit 841f9cd07b3a85e77c8d29b756900d66cb58f368
Author: xy720 <22125576+xy...@users.noreply.github.com>
AuthorDate: Tue Jul 28 16:24:07 2020 +0800

    [Bug][SparkLoad] Divide the upload in spark repository into two steps 
(#4195)
    
    When Fe uploads the spark archive, the broker may fail to write the file,
    resulting in the bad file being uploaded to the repository.
    
    Therefore, in order to prevent spark from reading bad files, we need to
    divide the upload into two steps.
    The first step is to upload the file, and the second step is to rename the 
file with MD5 value.
---
 .../org/apache/doris/common/util/BrokerUtil.java   | 25 +++++++++++++++++
 .../apache/doris/load/loadv2/SparkRepository.java  | 31 +++++++++++++++++++---
 .../doris/load/loadv2/SparkRepositoryTest.java     | 10 +++++--
 3 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 75c82f4..9e62c1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -45,6 +45,7 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TBrokerPReadRequest;
 import org.apache.doris.thrift.TBrokerPWriteRequest;
 import org.apache.doris.thrift.TBrokerReadResponse;
+import org.apache.doris.thrift.TBrokerRenamePathRequest;
 import org.apache.doris.thrift.TBrokerVersion;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloBrokerService;
@@ -374,6 +375,30 @@ public class BrokerUtil {
         }
     }
 
+    public static void rename(String origFilePath, String destFilePath, 
BrokerDesc brokerDesc) throws UserException {
+        Pair<TPaloBrokerService.Client, TNetworkAddress> pair = 
getBrokerAddressAndClient(brokerDesc);
+        TPaloBrokerService.Client client = pair.first;
+        TNetworkAddress address = pair.second;
+        boolean failed = true;
+        try {
+            TBrokerRenamePathRequest req = new 
TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath,
+                    destFilePath, brokerDesc.getProperties());
+            TBrokerOperationStatus rep = client.renamePath(req);
+            if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("failed to rename " + origFilePath + " 
to " + destFilePath +
+                        ", msg: " + rep.getMessage() + ", broker: " + address);
+            }
+            failed = false;
+        } catch (TException e) {
+            LOG.warn("Broker rename file failed, origin path={}, dest path={}, 
address={}, exception={}",
+                    origFilePath, destFilePath, address, e);
+            throw new UserException("Broker rename file exception. origin 
path=" + origFilePath +
+                    ", dest path=" + destFilePath + ", broker=" + address);
+        } finally {
+            returnClient(client, address, failed);
+        }
+    }
+
     public static Pair<TPaloBrokerService.Client, TNetworkAddress> 
getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException {
         Pair<TPaloBrokerService.Client, TNetworkAddress> pair = new 
Pair<TPaloBrokerService.Client, TNetworkAddress>(null, null);
         TNetworkAddress address = getAddress(brokerDesc);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
index 1062092..9073c15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
@@ -172,24 +172,34 @@ public class SparkRepository {
             String srcFilePath = null;
             // upload dpp
             {
+                // 1. upload dpp
                 srcFilePath = localDppPath;
+                String fileName = getFileName(PATH_DELIMITER, srcFilePath);
+                String origFilePath = remoteArchivePath + PATH_DELIMITER +
+                        assemblyFileName(PREFIX_LIB, "", fileName, "");
+                upload(srcFilePath, origFilePath);
+                // 2. rename dpp
                 String md5sum = getMd5String(srcFilePath);
                 long size = getFileSize(srcFilePath);
-                String fileName = getFileName(PATH_DELIMITER, srcFilePath);
                 String destFilePath = remoteArchivePath + PATH_DELIMITER +
                         assemblyFileName(PREFIX_LIB, md5sum, fileName, "");
-                upload(srcFilePath, destFilePath);
+                rename(origFilePath, destFilePath);
                 currentArchive.libraries.add(new SparkLibrary(destFilePath, 
md5sum, SparkLibrary.LibType.DPP, size));
             }
             // upload spark2x
             {
+                // 1. upload spark2x
                 srcFilePath = localSpark2xPath;
+                String fileName = getFileName(PATH_DELIMITER, srcFilePath);
+                String origFilePath = remoteArchivePath + PATH_DELIMITER +
+                        assemblyFileName(PREFIX_LIB, "", fileName, "");
+                upload(srcFilePath, origFilePath);
+                // 2. rename spark2x
                 String md5sum = getMd5String(srcFilePath);
                 long size = getFileSize(srcFilePath);
-                String fileName = getFileName(PATH_DELIMITER, srcFilePath);
                 String destFilePath = remoteArchivePath + PATH_DELIMITER +
                         assemblyFileName(PREFIX_LIB, md5sum, fileName, "");
-                upload(srcFilePath, destFilePath);
+                rename(origFilePath, destFilePath);
                 currentArchive.libraries.add(new SparkLibrary(destFilePath, 
md5sum, SparkLibrary.LibType.SPARK2X, size));
             }
             LOG.info("finished to upload archive to repository, 
currentDppVersion={}, path={}",
@@ -217,6 +227,9 @@ public class SparkRepository {
                 continue;
             }
             String md5sum = lib_arg[0];
+            if (Strings.isNullOrEmpty(md5sum)) {
+                continue;
+            }
             String type = lib_arg[1];
             SparkLibrary.LibType libType = null;
             switch (type) {
@@ -271,6 +284,16 @@ public class SparkRepository {
         }
     }
 
+    private void rename(String origFilePath, String destFilePath) throws 
LoadException {
+        try {
+            BrokerUtil.rename(origFilePath, destFilePath, brokerDesc);
+            LOG.info("finished to rename file, originPath={}, destPath={}", 
origFilePath, destFilePath);
+        } catch (UserException e) {
+            throw new LoadException("failed to rename file from " + 
origFilePath + " to " + destFilePath +
+                    ", message=" + e.getMessage());
+        }
+    }
+
     public SparkArchive getCurrentArchive() {
         return currentArchive;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java
index 70e5cf9..2bba7c7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java
@@ -119,7 +119,10 @@ public class SparkRepositoryTest {
                     throws UserException { return false; }
             @Mock
             void writeFile(String srcFilePath, String destFilePath, BrokerDesc 
brokerDesc)
-                    throws UserException { return;}
+                    throws UserException { return; }
+            @Mock
+            void rename(String origFilePath, String destFilePath, BrokerDesc 
brokerDesc)
+                    throws UserException { return; }
         };
 
         BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap());
@@ -179,7 +182,10 @@ public class SparkRepositoryTest {
                     throws UserException { return; }
             @Mock
             void writeFile(String srcFilePath, String destFilePath, BrokerDesc 
brokerDesc)
-                    throws UserException { return;}
+                    throws UserException { return; }
+            @Mock
+            void rename(String origFilePath, String destFilePath, BrokerDesc 
brokerDesc)
+                    throws UserException { return; }
         };
 
         // new md5dum of local library


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to