This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-fs-spi in repository https://gitbox.apache.org/repos/asf/doris.git
commit 59884ca553c61bb38f552a6aef253e2b8178e6ae Author: morningman <[email protected]> AuthorDate: Wed Apr 1 21:58:49 2026 +0800 [refactor](fs-spi) P4.8-F: delete legacy BrokerFileSystem, S3FileSystem, AzureFileSystem and StorageTypeMapper ### What problem does this PR solve? Issue Number: N/A Problem Summary: - StorageTypeMapper was only called from FileSystemFactory.get() (deprecated), which had zero production callers — only dead @Disabled/@Ignore tests - BrokerFileSystem, S3FileSystem, AzureFileSystem (legacy fe-core versions) were only instantiated through StorageTypeMapper; all production code already routes through FileSystemFactory.getFileSystem() → SPI providers - HiveUtil.isSplittable() had a dead instanceof BrokerFileSystem branch: the fs parameter is always an SPI FileSystem from FileSystemCache, never the legacy BrokerFileSystem, so the broker-specific Thrift RPC path was unreachable Changes: - Delete StorageTypeMapper.java (legacy enum factory, no callers) - Delete BrokerFileSystem.java, S3FileSystem.java, AzureFileSystem.java (legacy) - Delete FileSystemFactory.get(StorageProperties) and get(FileSystemType, Map) deprecated methods (no production callers; StorageTypeMapper is gone) - HiveUtil.isSplittable(): remove dead instanceof BrokerFileSystem branch - IcebergHadoopCatalogTest: instantiate DFSFileSystem directly (was using deleted FileSystemFactory.get(); test is @Ignore) - PaimonDlfRestCatalogTest: remove readByDorisS3FileSystem() helper (used deleted S3FileSystem; test is @Disabled) - Delete BrokerStorageTest, S3FileSystemTest (tested deleted legacy classes) Note: GsonUtils already handles missing legacy classes via reflection + try/catch ClassNotFoundException; the BrokerFileSystem/S3FileSystem/AzureFileSystem entries in the RuntimeTypeAdapterFactory will gracefully skip at startup. ### Release note None ### Check List (For Author) - Test: No need to test (deleting dead code with zero live callers; build verifies no remaining references) - Behavior changed: No (SPI path already handled all cases; broker RPC branch in isSplittable was unreachable) - Does this need documentation: No Co-authored-by: Copilot <[email protected]> --- .../org/apache/doris/datasource/hive/HiveUtil.java | 5 - .../org/apache/doris/fs/FileSystemFactory.java | 44 -- .../org/apache/doris/fs/StorageTypeMapper.java | 71 -- .../apache/doris/fs/remote/AzureFileSystem.java | 88 --- .../apache/doris/fs/remote/BrokerFileSystem.java | 712 --------------------- .../org/apache/doris/fs/remote/S3FileSystem.java | 149 ----- .../org/apache/doris/backup/BrokerStorageTest.java | 192 ------ .../metastore/PaimonDlfRestCatalogTest.java | 42 -- .../external/iceberg/IcebergHadoopCatalogTest.java | 5 +- .../org/apache/doris/fs/obj/S3FileSystemTest.java | 247 ------- 10 files changed, 3 insertions(+), 1552 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index 26994311be2..f223d7f59f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -23,7 +23,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.statistics.CommonStatistics; import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; @@ -112,10 +111,6 @@ public final class HiveUtil { public static boolean isSplittable(FileSystem remoteFileSystem, String inputFormat, String location) throws UserException { - if (remoteFileSystem instanceof BrokerFileSystem) { - return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat); - } - // All supported hive input format are splittable return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index dfec107056a..b328b6a04c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -17,10 +17,8 @@ package org.apache.doris.fs; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.filesystem.spi.FileSystemProvider; -import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,16 +33,6 @@ import java.util.ServiceLoader; /** * Factory for filesystem instances. * - * <h2>Two APIs</h2> - * <ul> - * <li><b>Legacy API</b> ({@code get(StorageProperties)} / {@code get(FileSystemType, Map)}) — - * delegates to {@link StorageTypeMapper}; returns {@link RemoteFileSystem}. All existing - * fe-core callers continue to use this path unchanged.</li> - * <li><b>SPI API</b> ({@code getFileSystem(Map)} / {@code getFileSystem(StorageProperties)}) — - * delegates to {@link FileSystemPluginManager}; returns - * {@code org.apache.doris.filesystem.spi.FileSystem}. New code should use this path.</li> - * </ul> - * * <p>Call {@link #initPluginManager(FileSystemPluginManager)} at FE startup before any * {@code getFileSystem()} call. In production, providers are loaded from the plugin directory * configured via {@code filesystem_plugin_root}. In unit tests, providers are discovered from @@ -62,38 +50,6 @@ public final class FileSystemFactory { private FileSystemFactory() {} - // ========================================================= - // Legacy API — backward compatible, returns RemoteFileSystem - // ========================================================= - - /** - * Legacy entry point. Returns a {@link RemoteFileSystem} via {@link StorageTypeMapper}. - * - * @deprecated New code should use {@link #getFileSystem(Map)} instead. - */ - @Deprecated - public static RemoteFileSystem get(StorageProperties storageProperties) { - return StorageTypeMapper.create(storageProperties); - } - - /** - * Legacy entry point by enum type. Returns a {@link RemoteFileSystem} via - * {@link StorageTypeMapper}. - * - * @deprecated New code should use {@link #getFileSystem(Map)} instead. - */ - @Deprecated - public static RemoteFileSystem get(FileSystemType fileSystemType, Map<String, String> properties) - throws UserException { - List<StorageProperties> storagePropertiesList = StorageProperties.createAll(properties); - for (StorageProperties storageProperties : storagePropertiesList) { - if (storageProperties.getStorageName().equalsIgnoreCase(fileSystemType.name())) { - return StorageTypeMapper.create(storageProperties); - } - } - throw new RuntimeException("Unsupported file system type: " + fileSystemType); - } - // ========================================================= // SPI API — returns spi.FileSystem // ========================================================= diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/StorageTypeMapper.java b/fe/fe-core/src/main/java/org/apache/doris/fs/StorageTypeMapper.java deleted file mode 100644 index 248bd456da5..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/StorageTypeMapper.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs; - -import org.apache.doris.datasource.property.storage.AzureProperties; -import org.apache.doris.datasource.property.storage.BrokerProperties; -import org.apache.doris.datasource.property.storage.COSProperties; -import org.apache.doris.datasource.property.storage.GCSProperties; -import org.apache.doris.datasource.property.storage.HdfsProperties; -import org.apache.doris.datasource.property.storage.MinioProperties; -import org.apache.doris.datasource.property.storage.OBSProperties; -import org.apache.doris.datasource.property.storage.OSSHdfsProperties; -import org.apache.doris.datasource.property.storage.OSSProperties; -import org.apache.doris.datasource.property.storage.S3Properties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.obj.CosObjStorage; -import org.apache.doris.fs.obj.ObsObjStorage; -import org.apache.doris.fs.obj.OssObjStorage; -import org.apache.doris.fs.remote.AzureFileSystem; -import org.apache.doris.fs.remote.BrokerFileSystem; -import org.apache.doris.fs.remote.RemoteFileSystem; -import org.apache.doris.fs.remote.S3FileSystem; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; - -import java.util.Arrays; -import java.util.function.Function; - -public enum StorageTypeMapper { - OSS(OSSProperties.class, props -> new S3FileSystem(props, new OssObjStorage(props))), - OBS(OBSProperties.class, props -> new S3FileSystem(props, new ObsObjStorage(props))), - COS(COSProperties.class, props -> new S3FileSystem(props, new CosObjStorage(props))), - MINIO(MinioProperties.class, S3FileSystem::new), - AZURE(AzureProperties.class, AzureFileSystem::new), - S3(S3Properties.class, S3FileSystem::new), - GCS(GCSProperties.class, S3FileSystem::new), - HDFS(HdfsProperties.class, DFSFileSystem::new), - BROKER(BrokerProperties.class, BrokerFileSystem::new), - OSS_HDFS(OSSHdfsProperties.class, DFSFileSystem::new); - - private final Class<? extends StorageProperties> propClass; - private final Function<StorageProperties, RemoteFileSystem> factory; - - <T extends StorageProperties> StorageTypeMapper(Class<T> propClass, Function<T, RemoteFileSystem> factory) { - this.propClass = propClass; - this.factory = prop -> factory.apply(propClass.cast(prop)); - } - - public static RemoteFileSystem create(StorageProperties prop) { - return Arrays.stream(values()) - .filter(type -> type.propClass.isInstance(prop)) - .findFirst() - .orElseThrow(() -> new RuntimeException("Unknown storage type")) - .factory.apply(prop); - } -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java deleted file mode 100644 index d85d5b13ab5..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote; - -import org.apache.doris.backup.Status; -import org.apache.doris.datasource.property.storage.AzureProperties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.foundation.fs.FsStorageType; -import org.apache.doris.fs.obj.AzureObjStorage; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class AzureFileSystem extends ObjFileSystem { - private static final Logger LOG = LogManager.getLogger(AzureFileSystem.class); - private final AzureProperties azureProperties; - - public AzureFileSystem(AzureProperties azureProperties) { - super(FsStorageType.AZURE.name(), FsStorageType.AZURE, new AzureObjStorage(azureProperties)); - this.azureProperties = azureProperties; - this.properties.putAll(azureProperties.getOrigProps()); - } - - @Override - public Status renameDir(String origFilePath, String destFilePath) { - throw new UnsupportedOperationException("Renaming directories is not supported in Azure File System."); - } - - @Override - public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { - AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); - return azureObjStorage.listFiles(remotePath, recursive, result); - } - - @Override - public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); - return azureObjStorage.globList(remotePath, result, fileNameOnly); - } - - @Override - public Status listDirectories(String remotePath, Set<String> result) { - AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); - return azureObjStorage.listDirectories(remotePath, result); - } - - @Override - public StorageProperties getStorageProperties() { - return azureProperties; - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - try { - objStorage.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public void completeMultipartUpload(String bucket, String key, String uploadId, Map<Integer, String> parts) { - AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); - azureObjStorage.completeMultipartUpload(bucket, key, parts); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java deleted file mode 100644 index ea5cc86292a..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ /dev/null @@ -1,712 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote; - -import org.apache.doris.backup.Status; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.datasource.property.storage.BrokerProperties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.foundation.fs.FsStorageType; -import org.apache.doris.fs.operations.BrokerFileOperations; -import org.apache.doris.fs.operations.OpParams; -import org.apache.doris.service.FrontendOptions; -import org.apache.doris.thrift.TBrokerCheckPathExistRequest; -import org.apache.doris.thrift.TBrokerCheckPathExistResponse; -import org.apache.doris.thrift.TBrokerDeletePathRequest; -import org.apache.doris.thrift.TBrokerFD; -import org.apache.doris.thrift.TBrokerFileStatus; -import org.apache.doris.thrift.TBrokerIsSplittableRequest; -import org.apache.doris.thrift.TBrokerIsSplittableResponse; -import org.apache.doris.thrift.TBrokerListPathRequest; -import org.apache.doris.thrift.TBrokerListResponse; -import org.apache.doris.thrift.TBrokerOperationStatus; -import org.apache.doris.thrift.TBrokerOperationStatusCode; -import org.apache.doris.thrift.TBrokerPReadRequest; -import org.apache.doris.thrift.TBrokerPWriteRequest; -import org.apache.doris.thrift.TBrokerReadResponse; -import org.apache.doris.thrift.TBrokerRenamePathRequest; -import org.apache.doris.thrift.TBrokerVersion; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPaloBrokerService; - -import com.google.common.base.Preconditions; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileVisitOption; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Comparator; -import java.util.List; - -public class BrokerFileSystem extends RemoteFileSystem { - private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class); - private final BrokerFileOperations operations; - private final BrokerProperties brokerProperties; - - //todo The method parameter should use the interface type StorageProperties instead of a specific implementation. - public BrokerFileSystem(BrokerProperties brokerProperties) { - super(brokerProperties.getBrokerName(), FsStorageType.BROKER); - this.brokerProperties = brokerProperties; - this.operations = new BrokerFileOperations(name, brokerProperties.getBrokerParams()); - } - - public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() { - Pair<TPaloBrokerService.Client, TNetworkAddress> result = Pair.of(null, null); - FsBroker broker; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Env.getCurrentEnv().getBrokerMgr().getBroker(name, localIP); - } catch (AnalysisException e) { - LOG.warn("failed to get a broker address: " + e.getMessage()); - return null; - } - TNetworkAddress address = new TNetworkAddress(broker.host, broker.port); - TPaloBrokerService.Client client; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - LOG.warn("failed to get broker client: " + e.getMessage()); - return null; - } - - result.first = client; - result.second = address; - LOG.info("get broker: {}", BrokerUtil.printBroker(name, address)); - return result; - } - - @Override - public Status exists(String remotePath) { - // 1. get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // check path - boolean needReturn = true; - try { - TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE, - remotePath, properties); - TBrokerCheckPathExistResponse rep = client.checkPathExist(req); - TBrokerOperationStatus opst = rep.getOpStatus(); - if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to check remote path exist: " + remotePath - + ", broker: " + BrokerUtil.printBroker(name, address) - + ". msg: " + opst.getMessage()); - } - - if (!rep.isIsPathExist()) { - return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); - } - - return Status.OK; - } catch (TException e) { - needReturn = false; - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to check remote path exist: " + remotePath - + ", broker: " + BrokerUtil.printBroker(name, address) - + ". msg: " + e.getMessage()); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - } - - @Override - public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { - if (LOG.isDebugEnabled()) { - LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize); - } - - long start = System.currentTimeMillis(); - - // 1. get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // 2. open file reader with broker - TBrokerFD fd = new TBrokerFD(); - Status opStatus = operations.openReader(OpParams.of(client, address, remoteFilePath, fd)); - if (!opStatus.ok()) { - return opStatus; - } - LOG.info("finished to open reader. fd: {}. download {} to {}.", - fd, remoteFilePath, localFilePath); - Preconditions.checkNotNull(fd); - // 3. delete local file if exist - File localFile = new File(localFilePath); - if (localFile.exists()) { - try { - Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS) - .sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath); - } - } - - // 4. create local file - Status status = Status.OK; - try { - if (!localFile.createNewFile()) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath); - } - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " - + localFilePath + ", msg: " + e.getMessage()); - } - - // 5. read remote file with broker and write to local - String lastErrMsg = null; - try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) { - final long bufSize = 1024 * 1024; // 1MB - long leftSize = fileSize; - long readOffset = 0; - while (leftSize > 0) { - long readLen = Math.min(leftSize, bufSize); - TBrokerReadResponse rep = null; - // We only retry if we encounter a timeout thrift exception. - int tryTimes = 0; - while (tryTimes < 3) { - try { - TBrokerPReadRequest req = new TBrokerPReadRequest(TBrokerVersion.VERSION_ONE, - fd, readOffset, readLen); - rep = client.pread(req); - if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { - // pread return failure. - lastErrMsg = String.format("failed to read via broker %s. " - + "current read offset: %d, read length: %d," - + " file size: %d, file: %s, err code: %d, msg: %s", - BrokerUtil.printBroker(name, address), - readOffset, readLen, fileSize, - remoteFilePath, rep.getOpStatus().getStatusCode().getValue(), - rep.getOpStatus().getMessage()); - LOG.warn(lastErrMsg); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - } - if (rep.opStatus.statusCode != TBrokerOperationStatusCode.END_OF_FILE) { - if (LOG.isDebugEnabled()) { - LOG.debug("download. readLen: {}, read data len: {}, left size:{}. total size: {}", - readLen, rep.getData().length, leftSize, fileSize); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("read eof: " + remoteFilePath); - } - } - break; - } catch (TTransportException e) { - if (e.getType() == TTransportException.TIMED_OUT) { - // we only retry when we encounter timeout exception. - lastErrMsg = String.format("failed to read via broker %s. " - + "current read offset: %d, read length: %d," - + " file size: %d, file: %s, timeout.", - BrokerUtil.printBroker(name, address), - readOffset, readLen, fileSize, - remoteFilePath); - tryTimes++; - continue; - } - - lastErrMsg = String.format("failed to read via broker %s. " - + "current read offset: %d, read length: %d," - + " file size: %d, file: %s. msg: %s", - BrokerUtil.printBroker(name, address), - readOffset, readLen, fileSize, - remoteFilePath, e.getMessage()); - LOG.warn(lastErrMsg); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } catch (TException e) { - lastErrMsg = String.format("failed to read via broker %s. " - + "current read offset: %d, read length: %d," - + " file size: %d, file: %s. msg: %s", - BrokerUtil.printBroker(name, address), - readOffset, readLen, fileSize, - remoteFilePath, e.getMessage()); - LOG.warn(lastErrMsg); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - } // end of retry loop - - if (status.ok() && tryTimes < 3) { - // read succeed, write to local file - Preconditions.checkNotNull(rep); - // NOTICE(cmy): Sometimes the actual read length does not equal to the expected read length, - // even if the broker's read buffer size is large enough. - // I don't know why, but have to adapt to it. - if (rep.getData().length != readLen) { - LOG.warn("the actual read length does not equal to " - + "the expected read length: {} vs. {}, file: {}, broker: {}", - rep.getData().length, readLen, remoteFilePath, - BrokerUtil.printBroker(name, address)); - } - - out.write(rep.getData()); - readOffset += rep.getData().length; - leftSize -= rep.getData().length; - } else { - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - } // end of reading remote file - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage() + ", broker: " - + BrokerUtil.printBroker(name, address)); - } finally { - // close broker reader - Status closeStatus = operations.closeReader(OpParams.of(client, address, fd)); - if (!closeStatus.ok()) { - LOG.warn(closeStatus.getErrMsg()); - if (status.ok()) { - // we return close write error only if no other error has been encountered. - status = closeStatus; - } - ClientPool.brokerPool.invalidateObject(address, client); - } else { - ClientPool.brokerPool.returnObject(address, client); - } - } - - LOG.info("finished to download from {} to {} with size: {}. cost {} ms", - remoteFilePath, localFilePath, fileSize, (System.currentTimeMillis() - start)); - return status; - } - - // directly upload the content to remote file - @Override - public Status directUpload(String content, String remoteFile) { - // 1. get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - TBrokerFD fd = new TBrokerFD(); - Status status = Status.OK; - try { - // 2. open file writer with broker - status = operations.openWriter(OpParams.of(client, address, remoteFile, fd)); - if (!status.ok()) { - return status; - } - - // 3. write content - try { - ByteBuffer bb = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)); - TBrokerPWriteRequest req = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, 0, bb); - TBrokerOperationStatus opst = client.pwrite(req); - if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { - // pwrite return failure. - status = new Status(Status.ErrCode.COMMON_ERROR, "write failed: " + opst.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } - } catch (TException e) { - status = new Status(Status.ErrCode.BAD_CONNECTION, "write exception: " + e.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } - } finally { - Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd)); - if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION - || status.getErrCode() == Status.ErrCode.BAD_CONNECTION) { - ClientPool.brokerPool.invalidateObject(address, client); - } else { - ClientPool.brokerPool.returnObject(address, client); - } - } - - return status; - } - - @Override - public Status upload(String localPath, String remotePath) { - long start = System.currentTimeMillis(); - // 1. get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // 2. open file write with broker - TBrokerFD fd = new TBrokerFD(); - Status status = operations.openWriter(OpParams.of(client, address, remotePath, fd)); - if (!status.ok()) { - return status; - } - - // 3. read local file and write to remote with broker - File localFile = new File(localPath); - long fileLength = localFile.length(); - byte[] readBuf = new byte[1024]; - try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) { - // save the last err msg - String lastErrMsg = null; - // save the current write offset of remote file - long writeOffset = 0; - // read local file, 1MB at a time - int bytesRead; - while ((bytesRead = in.read(readBuf)) != -1) { - ByteBuffer bb = ByteBuffer.wrap(readBuf, 0, bytesRead); - - // We only retry if we encounter a timeout thrift exception. - int tryTimes = 0; - while (tryTimes < 3) { - try { - TBrokerPWriteRequest req - = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, writeOffset, bb); - TBrokerOperationStatus opst = client.pwrite(req); - if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { - // pwrite return failure. - lastErrMsg = String.format("failed to write via broker %s. " - + "current write offset: %d, write length: %d," - + " file length: %d, file: %s, err code: %d, msg: %s", - BrokerUtil.printBroker(name, address), - writeOffset, bytesRead, fileLength, - remotePath, opst.getStatusCode().getValue(), opst.getMessage()); - LOG.warn(lastErrMsg); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - } - break; - } catch (TTransportException e) { - if (e.getType() == TTransportException.TIMED_OUT) { - // we only retry when we encounter timeout exception. - lastErrMsg = String.format("failed to write via broker %s. " - + "current write offset: %d, write length: %d," - + " file length: %d, file: %s. timeout", - BrokerUtil.printBroker(name, address), - writeOffset, bytesRead, fileLength, - remotePath); - tryTimes++; - continue; - } - - lastErrMsg = String.format("failed to write via broker %s. " - + "current write offset: %d, write length: %d," - + " file length: %d, file: %s. encounter TTransportException: %s", - BrokerUtil.printBroker(name, address), - writeOffset, bytesRead, fileLength, - remotePath, e.getMessage()); - LOG.warn(lastErrMsg, e); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } catch (TException e) { - lastErrMsg = String.format("failed to write via broker %s. " - + "current write offset: %d, write length: %d," - + " file length: %d, file: %s. encounter TException: %s", - BrokerUtil.printBroker(name, address), - writeOffset, bytesRead, fileLength, - remotePath, e.getMessage()); - LOG.warn(lastErrMsg, e); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - } - - if (status.ok() && tryTimes < 3) { - // write succeed, update current write offset - writeOffset += bytesRead; - } else { - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - } // end of read local file loop - } catch (FileNotFoundException e1) { - return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } catch (IOException e1) { - return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - // close write - Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd)); - if (!closeStatus.ok()) { - LOG.warn(closeStatus.getErrMsg()); - if (status.ok()) { - // we return close write error only if no other error has been encountered. - status = closeStatus; - } - ClientPool.brokerPool.invalidateObject(address, client); - } else { - ClientPool.brokerPool.returnObject(address, client); - } - } - - if (status.ok()) { - LOG.info("finished to upload {} to remote path {}. cost: {} ms", - localPath, remotePath, (System.currentTimeMillis() - start)); - } - return status; - } - - @Override - public Status rename(String origFilePath, String destFilePath) { - long start = System.currentTimeMillis(); - // 1. get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // 2. rename - boolean needReturn = true; - try { - TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, - origFilePath, destFilePath, properties); - TBrokerOperationStatus ost = client.renamePath(req); - if (ost.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + ost.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } - } catch (TException e) { - needReturn = false; - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + e.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - - LOG.info("finished to rename {} to {}. cost: {} ms", - origFilePath, destFilePath, (System.currentTimeMillis() - start)); - return Status.OK; - } - - @Override - public Status delete(String remotePath) { - // get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // delete - boolean needReturn = true; - try { - TBrokerDeletePathRequest req = new TBrokerDeletePathRequest(TBrokerVersion.VERSION_ONE, - remotePath, properties); - TBrokerOperationStatus opst = client.deletePath(req); - if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to delete remote path: " + remotePath + ". msg: " + opst.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } - - LOG.info("finished to delete remote path {}.", remotePath); - } catch (TException e) { - needReturn = false; - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to delete remote path: " + remotePath + ". msg: " + e.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - - return Status.OK; - } - - @Override - public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { - // get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.BAD_CONNECTION, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // invoke broker 'listLocatedFiles' interface - boolean needReturn = true; - try { - TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, - recursive, properties); - req.setOnlyFiles(true); - TBrokerListResponse response = client.listLocatedFiles(req); - TBrokerOperationStatus operationStatus = response.getOpStatus(); - if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to listLocatedFiles, remote path: " + remotePath + ". msg: " - + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); - } - List<TBrokerFileStatus> fileStatus = response.getFiles(); - for (TBrokerFileStatus tFile : fileStatus) { - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path); - RemoteFile file = new RemoteFile(path.getName(), path, !tFile.isDir, tFile.isDir, tFile.size, - tFile.getBlockSize(), tFile.getModificationTime(), null /* blockLocations is null*/); - result.add(file); - } - LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result); - return Status.OK; - } catch (TException e) { - needReturn = false; - return new Status(Status.ErrCode.COMMON_ERROR, "failed to listLocatedFiles, remote path: " - + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - } - - public boolean isSplittable(String remotePath, String inputFormat) throws UserException { - // get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - throw new UserException("failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // invoke 'isSplittable' interface - boolean needReturn = true; - try { - TBrokerIsSplittableRequest req = new TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE) - .setPath(remotePath).setInputFormat(inputFormat).setProperties(properties); - TBrokerIsSplittableResponse response = client.isSplittable(req); - TBrokerOperationStatus operationStatus = response.getOpStatus(); - if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - throw new UserException("failed to get path isSplittable, remote path: " + remotePath + ". msg: " - + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); - } - boolean result = response.isSplittable(); - LOG.info("finished to get path isSplittable, remote path {} with format {}, isSplittable: {}", - remotePath, inputFormat, result); - return result; - } catch (TException e) { - needReturn = false; - throw new UserException("failed to get path isSplittable, remote path: " - + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - } - - // List files in remotePath - @Override - public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - // get a proper broker - Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); - if (pair == null) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); - } - TPaloBrokerService.Client client = pair.first; - TNetworkAddress address = pair.second; - - // list - boolean needReturn = true; - try { - TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, - false /* not recursive */, properties); - req.setFileNameOnly(fileNameOnly); - TBrokerListResponse rep = client.listPath(req); - TBrokerOperationStatus opst = rep.getOpStatus(); - if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to list remote path: " + remotePath + ". msg: " + opst.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } - - List<TBrokerFileStatus> fileStatus = rep.getFiles(); - for (TBrokerFileStatus tFile : fileStatus) { - RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0, tFile.getModificationTime()); - result.add(file); - } - LOG.info("finished to list remote path {}. get files: {}", remotePath, result); - } catch (TException e) { - needReturn = false; - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to list remote path: " + remotePath + ". msg: " + e.getMessage() - + ", broker: " + BrokerUtil.printBroker(name, address)); - } finally { - if (needReturn) { - ClientPool.brokerPool.returnObject(address, client); - } else { - ClientPool.brokerPool.invalidateObject(address, client); - } - } - - return Status.OK; - } - - @Override - public Status makeDir(String remotePath) { - return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented."); - } - - @Override - public StorageProperties getStorageProperties() { - return brokerProperties; - } - - @Override - public void close() throws IOException { - // do nothing - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java deleted file mode 100644 index 1992c36df44..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote; - -import org.apache.doris.backup.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3URI; -import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.foundation.fs.FsStorageType; -import org.apache.doris.fs.GlobListResult; -import org.apache.doris.fs.obj.S3ObjStorage; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; -import software.amazon.awssdk.services.s3.model.CompletedPart; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class S3FileSystem extends ObjFileSystem { - - private static final Logger LOG = LogManager.getLogger(S3FileSystem.class); - private final AbstractS3CompatibleProperties s3Properties; - - public S3FileSystem(AbstractS3CompatibleProperties s3Properties) { - super(FsStorageType.S3.name(), FsStorageType.S3, - new S3ObjStorage(s3Properties)); - this.s3Properties = s3Properties; - initFsProperties(); - } - - /** Constructor that accepts a pre-built ObjStorage (used for OSS/OBS/COS subclass dispatch). */ - public S3FileSystem(AbstractS3CompatibleProperties s3Properties, S3ObjStorage objStorage) { - super(FsStorageType.S3.name(), FsStorageType.S3, objStorage); - this.s3Properties = s3Properties; - initFsProperties(); - } - - @Override - public StorageProperties getStorageProperties() { - return s3Properties; - } - - private void initFsProperties() { - this.properties.putAll(s3Properties.getOrigProps()); - } - - @Override - public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - return objStorage.listFiles(remotePath, recursive, result); - } - - // broker file pattern glob is too complex, so we use hadoop directly - @Override - public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - return objStorage.globList(remotePath, result, fileNameOnly); - } - - @Override - public GlobListResult globListWithLimit(String remotePath, List<RemoteFile> result, String startFile, - long fileSizeLimit, long fileNumLimit) { - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - return objStorage.globListWithLimit(remotePath, result, startFile, fileSizeLimit, fileNumLimit); - } - - @Override - public Status listDirectories(String remotePath, Set<String> result) { - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - return objStorage.listDirectories(remotePath, result); - } - - @Override - public boolean connectivityTest(List<String> filePaths) throws UserException { - if (filePaths == null || filePaths.isEmpty()) { - throw new UserException("File paths cannot be null or empty for connectivity test."); - } - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - try { - S3Client s3Client = objStorage.getClient(); - Set<String> bucketNames = new HashSet<>(); - boolean usePathStyle = Boolean.parseBoolean(s3Properties.getUsePathStyle()); - boolean forceParsingByStandardUri = Boolean.parseBoolean(s3Properties.getForceParsingByStandardUrl()); - for (String filePath : filePaths) { - S3URI s3uri; - s3uri = S3URI.create(filePath, usePathStyle, forceParsingByStandardUri); - bucketNames.add(s3uri.getBucket()); - } - bucketNames.forEach(bucketName -> s3Client.headBucket(b -> b.bucket(bucketName))); - return true; - } catch (Exception e) { - LOG.warn("S3 connectivityTest error: {}", e.getMessage(), e); - } - return false; - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - try { - objStorage.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public void completeMultipartUpload(String bucket, String key, String uploadId, Map<Integer, String> parts) { - S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; - List<CompletedPart> completedParts = new ArrayList<>(); - for (Map.Entry<Integer, String> entry : parts.entrySet()) { - completedParts.add(CompletedPart.builder() - .partNumber(entry.getKey()) - .eTag(entry.getValue()) - .build()); - } - - objStorage.getClient().completeMultipartUpload(CompleteMultipartUploadRequest.builder() - .bucket(bucket) - .key(key) - .uploadId(uploadId) - .multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build()) - .build()); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java deleted file mode 100644 index 4946e63642d..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.backup; - -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.GenericPool; -import org.apache.doris.common.Pair; -import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.datasource.property.storage.BrokerProperties; -import org.apache.doris.fs.remote.BrokerFileSystem; -import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPaloBrokerService; - -import mockit.Expectations; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; -import mockit.Tested; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.thrift.TServiceClient; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; - -@Ignore -public class BrokerStorageTest { - private static String basePath; - private final String bucket = "bos://yang-repo/"; - private final String brokerHost = "your_host"; - private Map<String, String> properties; - - @Tested - private BrokerFileSystem fileSystem; - private String testFile; - private String content; - private Pair<TPaloBrokerService.Client, TNetworkAddress> pair; - @Mocked - GenericPool pool; - - @BeforeClass - public static void init() { - basePath = "broker/" + UUID.randomUUID().toString(); - } - - @Before - public void setUp() throws Exception { - pair = Pair.of(null, null); - TTransport transport = new TSocket(brokerHost, 8111); - transport.open(); - TProtocol protocol = new TBinaryProtocol(transport); - pair.first = new TPaloBrokerService.Client(protocol); - pair.second = new TNetworkAddress(brokerHost, 8111); - properties = new HashMap<>(); - properties.put("bos_accesskey", System.getenv().getOrDefault("AWS_AK", "")); - properties.put("bos_secret_accesskey", System.getenv().getOrDefault("AWS_SK", "")); - properties.put("bos_endpoint", "http://bj.bcebos.com"); - BrokerProperties brokerProperties = BrokerProperties.of("bos_broker", properties); - fileSystem = new BrokerFileSystem(brokerProperties); - testFile = bucket + basePath + "/Ode_to_the_West_Wind"; - content = - "O wild West Wind, thou breath of Autumn's being\n" - + "Thou, from whose unseen presence the leaves dead\n" - + "Are driven, like ghosts from an enchanter fleeing,\n" - + "Yellow, and black, and pale, and hectic red,\n" - + "Pestilence-stricken multitudes:O thou\n" - + "Who chariotest to their dark wintry bed\n" - + "The winged seeds, where they lie cold and low,\n" - + "Each like a corpse within its grave, until\n" - + "Thine azure sister of the Spring shall blow\n" - + "Her clarion o'er the dreaming earth, and fill\n" - + "(Driving sweet buds like flocks to feed in air)\n" - + "With living hues and odors plain and hill:\n" - + "Wild Spirit, which art moving everywhere;\n" - + "Destroyer and preserver; hear, oh, hear!"; - new MockUp<BrokerFileSystem>() { - @Mock - public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() { - return pair; - } - }; - new Expectations() { - { - pool.returnObject(withInstanceOf(TNetworkAddress.class), withInstanceOf(TServiceClient.class)); - minTimes = 0; - } - }; - Deencapsulation.setField(ClientPool.class, "brokerPool", pool); - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile)); - } - - @Test - public void downloadWithFileSize() throws IOException { - File localFile = File.createTempFile("brokerunittest", ".dat"); - localFile.deleteOnExit(); - Status status = fileSystem.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length); - Assert.assertEquals(Status.OK, status); - Assert.assertEquals(DigestUtils.md5Hex(content.getBytes()), DigestUtils.md5Hex(new FileInputStream(localFile))); - status = fileSystem.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1); - Assert.assertNotEquals(Status.OK, status); - } - - @Test - public void upload() throws IOException { - File localFile = File.createTempFile("brokerunittest", ".dat"); - localFile.deleteOnExit(); - OutputStream os = new FileOutputStream(localFile); - byte[] buf = new byte[1024 * 1024]; - Random r = new Random(); - r.nextBytes(buf); - os.write(buf); - os.close(); - String remote = bucket + basePath + "/" + localFile.getName(); - Status status = fileSystem.upload(localFile.getAbsolutePath(), remote); - Assert.assertEquals(Status.OK, status); - File localFile2 = File.createTempFile("brokerunittest", ".dat"); - localFile2.deleteOnExit(); - status = fileSystem.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024); - Assert.assertEquals(Status.OK, status); - Assert.assertEquals(DigestUtils.md5Hex(new FileInputStream(localFile)), - DigestUtils.md5Hex(new FileInputStream(localFile2))); - } - - @Test - public void rename() { - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile + ".bak")); - fileSystem.rename(testFile + ".bak", testFile + ".bak1"); - Assert.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak1")); - } - - @Test - public void delete() { - String deleteFile = testFile + ".to_be_delete"; - Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile + "xxxx")); - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, deleteFile)); - Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile)); - Assert.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(deleteFile).getErrCode()); - Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile + "xxxx")); - } - - @Test - public void list() { - List<RemoteFile> result = new ArrayList<>(); - String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind"; - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1")); - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2")); - Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3")); - Assert.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result)); - Assert.assertEquals(3, result.size()); - } - - @Test - public void exists() { - Status status = fileSystem.exists(testFile); - Assert.assertEquals(Status.OK, status); - status = fileSystem.exists(testFile + ".NOT_EXIST"); - Assert.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode()); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonDlfRestCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonDlfRestCatalogTest.java index 99a7f931e3d..ce317382606 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonDlfRestCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonDlfRestCatalogTest.java @@ -17,13 +17,9 @@ package org.apache.doris.datasource.property.metastore; -import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.S3URI; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.property.storage.S3Properties; -import org.apache.doris.fs.StorageTypeMapper; -import org.apache.doris.fs.remote.S3FileSystem; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -33,7 +29,6 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; -import com.google.common.collect.Maps; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; import org.apache.paimon.catalog.Catalog.TableNotExistException; @@ -59,13 +54,11 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.util.List; import java.util.Map; import java.util.Optional; @@ -121,8 +114,6 @@ public class PaimonDlfRestCatalogTest { if (rawFiles.isPresent()) { for (RawFile rawFile : rawFiles.get()) { System.out.println("test debug get raw file: " + rawFile.path()); - readByDorisS3FileSystem(rawFile.path(), tmpAk, tmpSk, stsToken, endpoint, - "oss-cn-beijing"); readByAwsSdkV1(rawFile.path(), tmpAk, tmpSk, stsToken, endpoint, "oss-cn-beijing"); readByAwsSdkV2(rawFile.path(), tmpAk, tmpSk, stsToken, endpoint, "oss-cn-beijing"); } @@ -167,39 +158,6 @@ public class PaimonDlfRestCatalogTest { return CatalogFactory.createCatalog(catalogContext); } - private void readByDorisS3FileSystem(String path, String tmpAk, String tmpSk, String stsToken, String endpoint, - String region) { - // replace "oss://" with "s3://" - String finalPath = path.startsWith("oss://") ? path.replace("oss://", "s3://") : path; - System.out.println("test debug final path: " + finalPath); - Map<String, String> props = Maps.newHashMap(); - props.put("s3.endpoint", endpoint); - props.put("s3.region", region); - props.put("s3.access_key", tmpAk); - props.put("s3.secret_key", tmpSk); - props.put("s3.session_token", stsToken); - S3Properties s3Properties = S3Properties.of(props); - S3FileSystem s3Fs = (S3FileSystem) StorageTypeMapper.create(s3Properties); - File localFile = new File("/tmp/s3/" + System.currentTimeMillis() + ".data"); - if (localFile.exists()) { - try { - Files.delete(localFile.toPath()); - } catch (IOException e) { - System.err.println("Failed to delete existing local file: " + localFile.getAbsolutePath()); - e.printStackTrace(); - } - } else { - localFile.getParentFile().mkdirs(); // Ensure parent directories exist - } - Status st = s3Fs.getObjStorage().getObject(finalPath, localFile); - System.out.println(st); - if (st.ok()) { - System.out.println("test debug local path: " + localFile.getAbsolutePath()); - } else { - Assertions.fail(st.toString()); - } - } - private void readByAwsSdkV1(String filePath, String accessKeyId, String secretAccessKey, String sessionToken, String endpoint, String region) throws UserException { BasicSessionCredentials sessionCredentials = new BasicSessionCredentials( diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java index f371e83af47..fcf001dd079 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java @@ -18,8 +18,8 @@ package org.apache.doris.external.iceberg; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import com.google.common.collect.Maps; @@ -49,7 +49,8 @@ public class IcebergHadoopCatalogTest { properties.put("cos.endpoint", "cos.ap-beijing.myqcloud.com"); properties.put("cos.region", "ap-beijing"); String pathStr = "cosn://bucket1/namespace"; - DFSFileSystem fs = (DFSFileSystem) FileSystemFactory.get(StorageProperties.createPrimary(properties)); + DFSFileSystem fs = new DFSFileSystem( + (HdfsCompatibleProperties) StorageProperties.createPrimary(properties)); nativeFs = fs.nativeFileSystem(new Path(pathStr)); RemoteIterator<FileStatus> it = nativeFs.listStatusIterator(new Path(pathStr)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java deleted file mode 100644 index c761236cbfa..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java +++ /dev/null @@ -1,247 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.obj; - -import org.apache.doris.backup.Repository; -import org.apache.doris.backup.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3URI; -import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; -import org.apache.doris.datasource.property.storage.S3Properties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.FileSystemFactory; -import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.S3FileSystem; - -import mockit.Mock; -import mockit.MockUp; -import org.apache.commons.codec.digest.DigestUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.S3Object; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; - -public class S3FileSystemTest { - private static String basePath; - private final String bucket = "s3://doris-test/"; - private Map<String, String> properties; - private S3FileSystem fileSystem; - private MockedS3Client mockedClient; - private String testFile; - private String content; - // we use mocked s3 client to test s3 file system by default. - private boolean injectMockedClient = true; - - @BeforeAll - public static void init() { - basePath = "s3/" + UUID.randomUUID(); - } - - @BeforeEach - public void setUp() throws Exception { - properties = new HashMap<>(); - properties.put("AWS_ACCESS_KEY", System.getenv().getOrDefault("AWS_AK", "")); - properties.put("AWS_SECRET_KEY", System.getenv().getOrDefault("AWS_SK", "")); - properties.put("AWS_ENDPOINT", "http://s3.ap-northeast-1.amazonaws.com"); - properties.put(S3Properties.USE_PATH_STYLE, "false"); - properties.put("AWS_REGION", "bj"); - content = - "O wild West Wind, thou breath of Autumn's being\n" - + "Thou, from whose unseen presence the leaves dead\n" - + "Are driven, like ghosts from an enchanter fleeing,\n" - + "Yellow, and black, and pale, and hectic red,\n" - + "Pestilence-stricken multitudes:O thou\n" - + "Who chariotest to their dark wintry bed\n" - + "The winged seeds, where they lie cold and low,\n" - + "Each like a corpse within its grave, until\n" - + "Thine azure sister of the Spring shall blow\n" - + "Her clarion o'er the dreaming earth, and fill\n" - + "(Driving sweet buds like flocks to feed in air)\n" - + "With living hues and odors plain and hill:\n" - + "Wild Spirit, which art moving everywhere;\n" - + "Destroyer and preserver; hear, oh, hear!"; - - if (injectMockedClient) { - properties.put("AWS_ACCESS_KEY", "ak"); - properties.put("AWS_SECRET_KEY", "sk"); - // create storage - mockedClient = new MockedS3Client(); - mockedClient.setCanMakeData(true); - mockedClient.setMockedData(content.getBytes()); - new MockUp<S3ObjStorage>(S3ObjStorage.class) { - @Mock - public S3Client getClient() throws UserException { - return mockedClient; - } - }; - AbstractS3CompatibleProperties s3CompatibleProperties = - (AbstractS3CompatibleProperties) StorageProperties.createPrimary(properties); - S3ObjStorage mockedStorage = new S3ObjStorage(s3CompatibleProperties); - Assertions.assertTrue(mockedStorage.getClient() instanceof MockedS3Client); - // inject storage to file system. - fileSystem = (S3FileSystem) FileSystemFactory.get(s3CompatibleProperties); - new MockUp<S3FileSystem>(S3FileSystem.class) { - @Mock - public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - try { - S3URI uri = S3URI.create(remotePath, false); - ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket()); - ListObjectsV2Response response = mockedClient.listObjectsV2(requestBuilder.build()); - for (S3Object c : response.contents()) { - result.add(new RemoteFile(c.key(), true, c.size(), 0)); - } - } catch (UserException e) { - throw new RuntimeException(e); - } - return Status.OK; - } - }; - } else { - // can also real file system to test. - fileSystem = (S3FileSystem) FileSystemFactory.get(StorageProperties.createPrimary(properties)); - } - testFile = bucket + basePath + "/Ode_to_the_West_Wind"; - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, testFile)); - } - - @Test - public void downloadWithFileSize() throws IOException { - File localFile = File.createTempFile("s3unittest", ".dat"); - localFile.deleteOnExit(); - Status status = fileSystem.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length); - Assertions.assertEquals(Status.OK, status); - Assertions.assertEquals(DigestUtils.md5Hex(content.getBytes()), - DigestUtils.md5Hex(Files.newInputStream(localFile.toPath()))); - status = fileSystem.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1); - Assertions.assertNotEquals(Status.OK, status); - } - - @Test - public void upload() throws IOException { - File localFile = File.createTempFile("s3unittest", ".dat"); - localFile.deleteOnExit(); - OutputStream os = Files.newOutputStream(localFile.toPath()); - byte[] buf = new byte[1024 * 1024]; - Random r = new Random(); - r.nextBytes(buf); - os.write(buf); - os.close(); - if (injectMockedClient) { - mockedClient.setMockedData(buf); - } - String remote = bucket + basePath + "/" + localFile.getName(); - Status status = fileSystem.upload(localFile.getAbsolutePath(), remote); - Assertions.assertEquals(Status.OK, status); - File localFile2 = File.createTempFile("s3unittest", ".dat"); - localFile2.deleteOnExit(); - status = fileSystem.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024); - Assertions.assertEquals(Status.OK, status); - Assertions.assertEquals(DigestUtils.md5Hex(Files.newInputStream(localFile.toPath())), - DigestUtils.md5Hex(Files.newInputStream(localFile2.toPath()))); - } - - @Test - public void testRepositoryUpload() throws IOException { - Repository repo = new Repository(10000, "repo", false, bucket + basePath, - StorageProperties.createPrimary(properties)); - File localFile = File.createTempFile("s3unittest", ".dat"); - localFile.deleteOnExit(); - String remote = bucket + basePath + "/" + localFile.getName(); - Status status = repo.upload(localFile.getAbsolutePath(), remote); - Assertions.assertEquals(Status.OK, status); - } - - @Test - public void copy() { - Assertions.assertEquals(Status.OK, fileSystem.copy(testFile, testFile + ".bak")); - Assertions.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak")); - if (!injectMockedClient) { - Assertions.assertNotEquals(Status.OK, fileSystem.copy(testFile + ".bakxxx", testFile + ".bak")); - } - } - - @Test - public void rename() { - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, testFile + ".bak")); - fileSystem.rename(testFile + ".bak", testFile + ".bak1"); - if (!injectMockedClient) { - Assertions.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(testFile + ".bak").getErrCode()); - } - Assertions.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak1")); - } - - @Test - public void checkPathExist() { - Status status = fileSystem.exists(testFile); - Assertions.assertEquals(Status.OK, status); - status = fileSystem.exists(testFile + ".NOT_EXIST"); - if (!injectMockedClient) { - Assertions.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode()); - } - } - - @Test - public void makeDir() { - String path = bucket + basePath + "/test_path"; - Assertions.assertEquals(Status.OK, fileSystem.makeDir(path)); - if (!injectMockedClient) { - Assertions.assertNotEquals(Status.OK, fileSystem.exists(path)); - } - String path1 = bucket + basePath + "/test_path1/"; - Assertions.assertEquals(Status.OK, fileSystem.makeDir(path1)); - Assertions.assertEquals(Status.OK, fileSystem.exists(path1)); - } - - @Test - public void list() { - List<RemoteFile> result = new ArrayList<>(); - String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind"; - Assertions.assertEquals(Status.OK, fileSystem.delete(testFile)); - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1")); - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2")); - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3")); - Assertions.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result)); - Assertions.assertEquals(3, result.size()); - } - - @Test - public void delete() { - String deleteFile = testFile + ".to_be_delete"; - Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, deleteFile)); - Assertions.assertEquals(Status.OK, fileSystem.delete(deleteFile)); - if (!injectMockedClient) { - Assertions.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(deleteFile).getErrCode()); - } - Assertions.assertEquals(Status.OK, fileSystem.delete(deleteFile + "xxxx")); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
