This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 841f9cd [Bug][SparkLoad] Divide the upload in spark repository into two steps (#4195) 841f9cd is described below commit 841f9cd07b3a85e77c8d29b756900d66cb58f368 Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Tue Jul 28 16:24:07 2020 +0800 [Bug][SparkLoad] Divide the upload in spark repository into two steps (#4195) When Fe uploads the spark archive, the broker may fail to write the file, resulting in the bad file being uploaded to the repository. Therefore, in order to prevent spark from reading bad files, we need to divide the upload into two steps. The first step is to upload the file, and the second step is to rename the file with MD5 value. --- .../org/apache/doris/common/util/BrokerUtil.java | 25 +++++++++++++++++ .../apache/doris/load/loadv2/SparkRepository.java | 31 +++++++++++++++++++--- .../doris/load/loadv2/SparkRepositoryTest.java | 10 +++++-- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 75c82f4..9e62c1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; @@ -374,6 +375,30 @@ public class BrokerUtil { } } + public static void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) throws UserException { + Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBrokerAddressAndClient(brokerDesc); + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + boolean failed = true; + try { + TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath, + destFilePath, brokerDesc.getProperties()); + TBrokerOperationStatus rep = client.renamePath(req); + if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to rename " + origFilePath + " to " + destFilePath + + ", msg: " + rep.getMessage() + ", broker: " + address); + } + failed = false; + } catch (TException e) { + LOG.warn("Broker rename file failed, origin path={}, dest path={}, address={}, exception={}", + origFilePath, destFilePath, address, e); + throw new UserException("Broker rename file exception. origin path=" + origFilePath + + ", dest path=" + destFilePath + ", broker=" + address); + } finally { + returnClient(client, address, failed); + } + } + public static Pair<TPaloBrokerService.Client, TNetworkAddress> getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException { Pair<TPaloBrokerService.Client, TNetworkAddress> pair = new Pair<TPaloBrokerService.Client, TNetworkAddress>(null, null); TNetworkAddress address = getAddress(brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 1062092..9073c15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -172,24 +172,34 @@ public class SparkRepository { String srcFilePath = null; // upload dpp { + // 1. upload dpp srcFilePath = localDppPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename dpp String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size)); } // upload spark2x { + // 1. upload spark2x srcFilePath = localSpark2xPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename spark2x String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size)); } LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}", @@ -217,6 +227,9 @@ public class SparkRepository { continue; } String md5sum = lib_arg[0]; + if (Strings.isNullOrEmpty(md5sum)) { + continue; + } String type = lib_arg[1]; SparkLibrary.LibType libType = null; switch (type) { @@ -271,6 +284,16 @@ public class SparkRepository { } } + private void rename(String origFilePath, String destFilePath) throws LoadException { + try { + BrokerUtil.rename(origFilePath, destFilePath, brokerDesc); + LOG.info("finished to rename file, originPath={}, destPath={}", origFilePath, destFilePath); + } catch (UserException e) { + throw new LoadException("failed to rename file from " + origFilePath + " to " + destFilePath + + ", message=" + e.getMessage()); + } + } + public SparkArchive getCurrentArchive() { return currentArchive; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java index 70e5cf9..2bba7c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java @@ -119,7 +119,10 @@ public class SparkRepositoryTest { throws UserException { return false; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); @@ -179,7 +182,10 @@ public class SparkRepositoryTest { throws UserException { return; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; // new md5dum of local library --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org