This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 31a7060dbdb88c8e81be00aada4a2df35eca625c Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Wed Apr 10 15:09:01 2024 +0800 [testcase](hive)add exception test for hive txn (#33278) Issue #31442 #32726 1. add LocalDfsFileSystem to manipulate local files. 2. add HMSCachedClientTest to analog HMS services. 3. add test for rollback commit. --- .../doris/datasource/hive/HMSTransaction.java | 91 +++-- .../doris/datasource/hive/HiveMetadataOps.java | 14 +- .../org/apache/doris/datasource/hive/HiveUtil.java | 65 +++- .../datasource/hive/ThriftHMSCachedClient.java | 76 +---- .../main/java/org/apache/doris/fs/FileSystem.java | 9 + .../org/apache/doris/fs/LocalDfsFileSystem.java | 245 ++++++++++++++ .../java/org/apache/doris/fs/LocalFileSystem.java | 76 ----- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 45 +++ .../doris/datasource/HMSCachedClientTest.java | 328 ++++++++++++++++++ .../doris/datasource/hive/HmsCommitTest.java | 374 ++++++++++++++++----- 10 files changed, 1033 insertions(+), 290 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index c3e8d00c5d1..84221b74e7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -23,8 +23,8 @@ package org.apache.doris.datasource.hive; import org.apache.doris.backup.Status; import org.apache.doris.common.Pair; +import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; import org.apache.doris.transaction.Transaction; @@ -47,11 +47,13 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -63,7 +65,7 @@ import java.util.stream.Collectors; public class HMSTransaction implements Transaction { private static final Logger LOG = LogManager.getLogger(HMSTransaction.class); private final HiveMetadataOps hiveOps; - private final RemoteFileSystem fs; + private final FileSystem fs; private String dbName; private String tbName; @@ -115,8 +117,8 @@ public class HMSTransaction implements Transaction { } public void finishInsertTable(String dbName, String tbName) { - this.tbName = tbName; this.dbName = dbName; + this.tbName = tbName; List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates); Table table = getTable(dbName, tbName); List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>(); @@ -226,17 +228,10 @@ public class HMSTransaction implements Transaction { } } - hmsCommitter.waitForAsyncFileSystemTasks(); - hmsCommitter.doAddPartitionsTask(); - hmsCommitter.doUpdateStatisticsTasks(); + hmsCommitter.doCommit(); } catch (Throwable t) { LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName); - hmsCommitter.cancelUnStartedAsyncFileSystemTask(); - hmsCommitter.undoUpdateStatisticsTasks(); - hmsCommitter.undoAddPartitionsTask(); - hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable(); - hmsCommitter.runDirectoryClearUpTasksForAbort(); - hmsCommitter.runRenameDirTasksForAbort(); + hmsCommitter.rollback(); throw t; } finally { hmsCommitter.runClearPathsForFinish(); @@ -354,7 +349,7 @@ public class HMSTransaction implements Transaction { } } - private static class UpdateStatisticsTask { + public static class UpdateStatisticsTask { private final String dbName; private final String tableName; private final Optional<String> partitionName; @@ -442,7 +437,6 @@ public class HMSTransaction implements Transaction { throw t; } } - partitions.clear(); } public List<List<String>> rollback(HiveMetadataOps hiveOps) { @@ -548,7 +542,7 @@ public class HMSTransaction implements Transaction { private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { try { - if (!fs.exists(directory.getName()).ok()) { + if (!fs.exists(directory.toString()).ok()) { return new DeleteRecursivelyResult(true, ImmutableList.of()); } } catch (Exception e) { @@ -561,57 +555,53 @@ public class HMSTransaction implements Transaction { } private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { - List<RemoteFile> remoteFiles = new ArrayList<>(); - - Status status = fs.list(directory.getName(), remoteFiles); - if (!status.ok()) { + List<RemoteFile> allFiles = new ArrayList<>(); + Set<String> allDirs = new HashSet<>(); + Status statusFile = fs.listFiles(directory.toString(), allFiles); + Status statusDir = fs.listDirectories(directory.toString(), allDirs); + if (!statusFile.ok() || !statusDir.ok()) { ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); notDeletedEligibleItems.add(directory + "/*"); return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); } - boolean isEmptyDir = true; - List<String> notDeletedEligibleItems = new ArrayList<>(); - for (RemoteFile file : remoteFiles) { - if (file.isFile()) { - Path filePath = file.getPath(); - isEmptyDir = false; - // TODO Check if this file was created by this query - if (!deleteIfExists(filePath)) { - notDeletedEligibleItems.add(filePath.toString()); - } - } else if (file.isDirectory()) { - DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir); - if (!subResult.dirNotExists()) { - isEmptyDir = false; - } - if (!subResult.getNotDeletedEligibleItems().isEmpty()) { - notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); - } - } else { - isEmptyDir = false; - notDeletedEligibleItems.add(file.getPath().toString()); + boolean allDescendentsDeleted = true; + ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); + for (RemoteFile file : allFiles) { + String fileName = file.getName(); + if (!deleteIfExists(new Path(fileName))) { + allDescendentsDeleted = false; + notDeletedEligibleItems.add(fileName); } } - if (isEmptyDir && deleteEmptyDir) { - Verify.verify(notDeletedEligibleItems.isEmpty()); + for (String dir : allDirs) { + DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new Path(dir), deleteEmptyDir); + if (!subResult.dirNotExists()) { + allDescendentsDeleted = false; + } + if (!subResult.getNotDeletedEligibleItems().isEmpty()) { + notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); + } + } + + if (allDescendentsDeleted && deleteEmptyDir) { + Verify.verify(notDeletedEligibleItems.build().isEmpty()); if (!deleteIfExists(directory)) { return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/")); } // all items of the location have been deleted. return new DeleteRecursivelyResult(true, ImmutableList.of()); } - - return new DeleteRecursivelyResult(false, notDeletedEligibleItems); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); } public boolean deleteIfExists(Path path) { - Status status = fs.delete(path.getName()); + Status status = fs.delete(path.toString()); if (status.ok()) { return true; } - return !fs.exists(path.getName()).ok(); + return !fs.exists(path.toString()).ok(); } public static class DatabaseTableName { @@ -1039,9 +1029,6 @@ public class HMSTransaction implements Transaction { } private void undoAddPartitionsTask() { - if (addPartitionsTask.isEmpty()) { - return; - } HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); String dbName = firstPartition.getDbName(); @@ -1304,10 +1291,16 @@ public class HMSTransaction implements Transaction { } } + public void doNothing() { + // do nothing + // only for regression test and unit test to throw exception + } + public void doCommit() { waitForAsyncFileSystemTasks(); doAddPartitionsTask(); doUpdateStatisticsTasks(); + doNothing(); } public void rollback() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index f3556d13a57..5cf362508e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -34,7 +34,7 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOps; -import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import com.google.common.annotations.VisibleForTesting; @@ -58,7 +58,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class); private static final int MIN_CLIENT_POOL_SIZE = 8; private final HMSCachedClient client; - private final RemoteFileSystem fs; + private final FileSystem fs; private final HMSExternalCatalog catalog; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { @@ -75,11 +75,19 @@ public class HiveMetadataOps implements ExternalMetadataOps { this.fs = new DFSFileSystem(catalog.getProperties()); } + // for test + public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, FileSystem fs) { + this.catalog = catalog; + this.client = client; + this.fs = fs; + } + + public HMSCachedClient getClient() { return client; } - public RemoteFileSystem getFs() { + public FileSystem getFs() { return fs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index 0dc7eb5a386..dfbfe786985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -25,10 +25,12 @@ import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -167,7 +169,9 @@ public final class HiveUtil { ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder(); for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) { Partition partition = partitionValuesToPartition.get(entry.getValue()); - resultBuilder.put(entry.getKey(), partition); + if (partition != null) { + resultBuilder.put(entry.getKey(), partition); + } } return resultBuilder.build(); } @@ -267,4 +271,63 @@ public final class HiveUtil { database.setDescription(hiveDb.getComment()); return database; } + + public static Map<String, String> updateStatisticsParameters( + Map<String, String> parameters, + HiveCommonStatistics statistics) { + HashMap<String, String> result = new HashMap<>(parameters); + + result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount())); + result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount())); + result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes())); + + // CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set + // https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231 + if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) { + result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730"); + } + + return result; + } + + public static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) { + long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1")); + long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1")); + long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1")); + return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize); + } + + public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) { + Partition partition = + toMetastoreApiPartition(partitionWithStatistics.getPartition()); + partition.setParameters(updateStatisticsParameters( + partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics())); + return partition; + } + + public static Partition toMetastoreApiPartition(HivePartition hivePartition) { + Partition result = new Partition(); + result.setDbName(hivePartition.getDbName()); + result.setTableName(hivePartition.getTblName()); + result.setValues(hivePartition.getPartitionValues()); + result.setSd(makeStorageDescriptorFromHivePartition(hivePartition)); + result.setParameters(hivePartition.getParameters()); + return result; + } + + public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) { + SerDeInfo serdeInfo = new SerDeInfo(); + serdeInfo.setName(partition.getTblName()); + serdeInfo.setSerializationLib(partition.getSerde()); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation(Strings.emptyToNull(partition.getPath())); + sd.setCols(partition.getColumns()); + sd.setSerdeInfo(serdeInfo); + sd.setInputFormat(partition.getInputFormat()); + sd.setOutputFormat(partition.getOutputFormat()); + sd.setParameters(ImmutableMap.of()); + + return sd; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 00fc0d03fa8..9fae854645b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -27,11 +27,8 @@ import org.apache.doris.datasource.property.constants.HMSProperties; import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -56,8 +53,6 @@ import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -67,7 +62,6 @@ import org.apache.logging.log4j.Logger; import java.security.PrivilegedExceptionAction; import java.util.BitSet; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -605,11 +599,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient { Table originTable = getTable(dbName, tableName); Map<String, String> originParams = originTable.getParameters(); - HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams)); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); Table newTable = originTable.deepCopy(); Map<String, String> newParams = - updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); newTable.setParameters(newParams); client.client.alter_table(dbName, tableName, newTable); @@ -633,11 +627,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient { Partition originPartition = partitions.get(0); Map<String, String> originParams = originPartition.getParameters(); - HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams)); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); Partition modifiedPartition = originPartition.deepCopy(); Map<String, String> newParams = - updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); modifiedPartition.setParameters(newParams); client.client.alter_partition(dbName, tableName, modifiedPartition); @@ -650,7 +644,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) { try (ThriftHMSClient client = getClient()) { List<Partition> hivePartitions = partitions.stream() - .map(ThriftHMSCachedClient::toMetastoreApiPartition) + .map(HiveUtil::toMetastoreApiPartition) .collect(Collectors.toList()); client.client.add_partitions(hivePartitions); } catch (Exception e) { @@ -666,64 +660,4 @@ public class ThriftHMSCachedClient implements HMSCachedClient { throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName); } } - - private static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) { - long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1")); - long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1")); - long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1")); - return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize); - } - - private static Map<String, String> updateStatisticsParameters( - Map<String, String> parameters, - HiveCommonStatistics statistics) { - HashMap<String, String> result = new HashMap<>(parameters); - - result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount())); - result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount())); - result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes())); - - // CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set - // https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231 - if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) { - result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730"); - } - - return result; - } - - public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) { - Partition partition = - toMetastoreApiPartition(partitionWithStatistics.getPartition()); - partition.setParameters(updateStatisticsParameters( - partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics())); - return partition; - } - - public static Partition toMetastoreApiPartition(HivePartition hivePartition) { - Partition result = new Partition(); - result.setDbName(hivePartition.getDbName()); - result.setTableName(hivePartition.getTblName()); - result.setValues(hivePartition.getPartitionValues()); - result.setSd(makeStorageDescriptorFromHivePartition(hivePartition)); - result.setParameters(hivePartition.getParameters()); - return result; - } - - private static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) { - SerDeInfo serdeInfo = new SerDeInfo(); - serdeInfo.setName(partition.getTblName()); - serdeInfo.setSerializationLib(partition.getSerde()); - - StorageDescriptor sd = new StorageDescriptor(); - sd.setLocation(Strings.emptyToNull(partition.getPath())); - sd.setCols(partition.getColumns()); - sd.setSerdeInfo(serdeInfo); - sd.setInputFormat(partition.getInputFormat()); - sd.setOutputFormat(partition.getOutputFormat()); - sd.setParameters(ImmutableMap.of()); - - return sd; - } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 0470d8b3714..369fc917d77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -23,6 +23,7 @@ import org.apache.doris.fs.remote.RemoteFile; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,4 +87,12 @@ public interface FileSystem { } Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly); + + default Status listFiles(String remotePath, List<RemoteFile> result) { + throw new UnsupportedOperationException("Unsupported operation list files on current file system."); + } + + default Status listDirectories(String remotePath, Set<String> result) { + throw new UnsupportedOperationException("Unsupported operation list directores on current file system."); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java new file mode 100644 index 00000000000..0faf1916db0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +public class LocalDfsFileSystem implements FileSystem { + + public LocalFileSystem fs = LocalFileSystem.getLocal(new Configuration()); + + public LocalDfsFileSystem() throws IOException { + } + + @Override + public Map<String, String> getProperties() { + return null; + } + + @Override + public Status exists(String remotePath) { + boolean exists = false; + try { + exists = fs.exists(new Path(remotePath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (exists) { + return Status.OK; + } else { + return new Status(Status.ErrCode.NOT_FOUND, ""); + } + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + return null; + } + + @Override + public Status upload(String localPath, String remotePath) { + return null; + } + + @Override + public Status directUpload(String content, String remoteFile) { + return null; + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + try { + fs.rename(new Path(origFilePath), new Path(destFilePath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Status.OK; + } + + @Override + public Status renameDir(String origFilePath, String destFilePath, Runnable runWhenPathNotExist) { + Status status = exists(destFilePath); + if (status.ok()) { + throw new RuntimeException("Destination directory already exists: " + destFilePath); + } + String targetParent = new Path(destFilePath).getParent().toString(); + status = exists(targetParent); + if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { + status = makeDir(targetParent); + } + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + + runWhenPathNotExist.run(); + + return rename(origFilePath, destFilePath); + } + + @Override + public void asyncRename(Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List<String> fileNames) { + for (String fileName : fileNames) { + Path source = new Path(origFilePath, fileName); + Path target = new Path(destFilePath, fileName); + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + Status status = rename(source.toString(), target.toString()); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } + } + + @Override + public void asyncRenameDir(Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } + + @Override + public Status delete(String remotePath) { + try { + fs.delete(new Path(remotePath), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Status.OK; + } + + @Override + public Status makeDir(String remotePath) { + try { + fs.mkdirs(new Path(remotePath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Status.OK; + } + + @Override + public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { + return null; + } + + @Override + public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + try { + FileStatus[] locatedFileStatusRemoteIterator = fs.listStatus(new Path(remotePath)); + if (locatedFileStatusRemoteIterator == null) { + return Status.OK; + } + for (FileStatus fileStatus : locatedFileStatusRemoteIterator) { + RemoteFile remoteFile = new RemoteFile( + fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), + fileStatus.getBlockSize(), fileStatus.getModificationTime()); + result.add(remoteFile); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return Status.OK; + } + + @Override + public Status listFiles(String remotePath, List<RemoteFile> result) { + RemoteIterator<LocatedFileStatus> iterator; + try { + Path dirPath = new Path(remotePath); + iterator = fs.listFiles(dirPath, true); + while (iterator.hasNext()) { + LocatedFileStatus next = iterator.next(); + String location = next.getPath().toString(); + String child = location.substring(dirPath.toString().length()); + while (child.startsWith("/")) { + child = child.substring(1); + } + if (!child.contains("/")) { + result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize())); + } + } + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; + } + + @Override + public Status listDirectories(String remotePath, Set<String> result) { + try { + FileStatus[] fileStatuses = fs.listStatus(new Path(remotePath)); + result.addAll( + Arrays.stream(fileStatuses) + .filter(FileStatus::isDirectory) + .map(file -> file.getPath().toString() + "/") + .collect(ImmutableSet.toImmutableSet())); + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; + } + + public void createFile(String path) throws IOException { + Path path1 = new Path(path); + if (!exists(path1.getParent().toString()).ok()) { + makeDir(path1.getParent().toString()); + } + FSDataOutputStream build = fs.createFile(path1).build(); + build.close(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java deleted file mode 100644 index 1baaf9bd2f7..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs; - -import org.apache.doris.backup.Status; -import org.apache.doris.fs.remote.RemoteFile; - -import java.util.List; -import java.util.Map; - -public class LocalFileSystem implements FileSystem { - @Override - public Status exists(String remotePath) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status upload(String localPath, String remotePath) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status directUpload(String content, String remoteFile) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status rename(String origFilePath, String destFilePath) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status delete(String remotePath) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status makeDir(String remotePath) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } - - @Override - public Map<String, String> getProperties() { - throw new UnsupportedOperationException("Unsupported operation on local file system."); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index fabc341389e..7e3032ca807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -31,12 +31,15 @@ import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,9 +54,11 @@ import java.nio.ByteBuffer; import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -523,4 +528,44 @@ public class DFSFileSystem extends RemoteFileSystem { } return Status.OK; } + + @Override + public Status listFiles(String remotePath, List<RemoteFile> result) { + RemoteIterator<LocatedFileStatus> iterator; + try { + FileSystem fileSystem = nativeFileSystem(remotePath); + Path dirPath = new Path(remotePath); + iterator = fileSystem.listFiles(dirPath, true); + while (iterator.hasNext()) { + LocatedFileStatus next = iterator.next(); + String location = next.getPath().toString(); + String child = location.substring(dirPath.toString().length()); + while (child.startsWith("/")) { + child = child.substring(1); + } + if (!child.contains("/")) { + result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize())); + } + } + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; + } + + @Override + public Status listDirectories(String remotePath, Set<String> result) { + try { + FileSystem fileSystem = nativeFileSystem(remotePath); + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + result.addAll( + Arrays.stream(fileStatuses) + .filter(FileStatus::isDirectory) + .map(file -> file.getPath().toString() + "/") + .collect(ImmutableSet.toImmutableSet())); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java new file mode 100644 index 00000000000..126df780dbf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.datasource.hive.HMSCachedClient; +import org.apache.doris.datasource.hive.HMSTransaction; +import org.apache.doris.datasource.hive.HiveDatabaseMetadata; +import org.apache.doris.datasource.hive.HivePartitionStatistics; +import org.apache.doris.datasource.hive.HivePartitionWithStatistics; +import org.apache.doris.datasource.hive.HiveTableMetadata; +import org.apache.doris.datasource.hive.HiveUtil; +import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class HMSCachedClientTest implements HMSCachedClient { + + public Map<HMSTransaction.DatabaseTableName, List<Partition>> partitions = new ConcurrentHashMap<>(); + public Map<String, List<Table>> tables = new HashMap<>(); + public List<Database> dbs = new ArrayList<>(); + + @Override + public Database getDatabase(String dbName) { + for (Database db : this.dbs) { + if (db.getName().equals(dbName)) { + return db; + } + } + throw new RuntimeException("can't found database: " + dbName); + } + + @Override + public List<String> getAllDatabases() { + return null; + } + + @Override + public List<String> getAllTables(String dbName) { + return null; + } + + @Override + public boolean tableExists(String dbName, String tblName) { + List<Table> tablesList = getTableList(dbName); + for (Table table : tablesList) { + if (table.getTableName().equals(tblName)) { + return true; + } + } + return false; + } + + @Override + public List<String> listPartitionNames(String dbName, String tblName) { + List<Partition> partitionList = getPartitionList(dbName, tblName); + ArrayList<String> ret = new ArrayList<>(); + for (Partition partition : partitionList) { + StringBuilder names = new StringBuilder(); + List<String> values = partition.getValues(); + for (int i = 0; i < values.size(); i++) { + names.append(values.get(i)); + if (i < values.size() - 1) { + names.append("/"); + } + } + ret.add(names.toString()); + } + return ret; + } + + @Override + public List<Partition> listPartitions(String dbName, String tblName) { + return getPartitionList(dbName, tblName); + } + + @Override + public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) { + return listPartitionNames(dbName, tblName); + } + + @Override + public Partition getPartition(String dbName, String tblName, List<String> partitionValues) { + synchronized (this) { + List<Partition> partitionList = getPartitionList(dbName, tblName); + for (Partition partition : partitionList) { + if (partition.getValues().equals(partitionValues)) { + return partition; + } + } + throw new RuntimeException("can't found partition"); + } + } + + @Override + public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) { + synchronized (this) { + List<Partition> partitionList = getPartitionList(dbName, tblName); + ArrayList<Partition> ret = new ArrayList<>(); + List<List<String>> partitionValuesList = + partitionNames + .stream() + .map(HiveUtil::toPartitionValues) + .collect(Collectors.toList()); + partitionValuesList.forEach(values -> { + for (Partition partition : partitionList) { + if (partition.getValues().equals(values)) { + ret.add(partition); + break; + } + } + }); + return ret; + } + } + + @Override + public Table getTable(String dbName, String tblName) { + List<Table> tablesList = getTableList(dbName); + for (Table table : tablesList) { + if (table.getTableName().equals(tblName)) { + return table; + } + } + throw new RuntimeException("can't found table: " + tblName); + } + + @Override + public List<FieldSchema> getSchema(String dbName, String tblName) { + return null; + } + + @Override + public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) { + return null; + } + + @Override + public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tblName, List<String> partNames, List<String> columns) { + return null; + } + + @Override + public CurrentNotificationEventId getCurrentNotificationEventId() { + return null; + } + + @Override + public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws MetastoreNotificationFetchException { + return null; + } + + @Override + public long openTxn(String user) { + return 0; + } + + @Override + public void commitTxn(long txnId) { + + } + + @Override + public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + return null; + } + + @Override + public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List<String> partitionNames, long timeoutMs) { + + } + + @Override + public String getCatalogLocation(String catalogName) { + return null; + } + + @Override + public void createDatabase(DatabaseMetadata db) { + dbs.add(HiveUtil.toHiveDatabase((HiveDatabaseMetadata) db)); + tables.put(db.getDbName(), new ArrayList<>()); + } + + @Override + public void dropDatabase(String dbName) { + Database db = getDatabase(dbName); + this.dbs.remove(db); + } + + @Override + public void dropTable(String dbName, String tableName) { + Table table = getTable(dbName, tableName); + this.tables.get(dbName).remove(table); + this.partitions.remove(new HMSTransaction.DatabaseTableName(dbName, tableName)); + } + + @Override + public void createTable(TableMetadata tbl, boolean ignoreIfExists) { + String dbName = tbl.getDbName(); + String tbName = tbl.getTableName(); + if (tableExists(dbName, tbName)) { + throw new RuntimeException("Table '" + tbName + "' has existed in '" + dbName + "'."); + } + + List<Table> tableList = getTableList(tbl.getDbName()); + tableList.add(HiveUtil.toHiveTable((HiveTableMetadata) tbl)); + HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tbName); + partitions.put(key, new ArrayList<>()); + } + + @Override + public void updateTableStatistics(String dbName, String tableName, Function<HivePartitionStatistics, HivePartitionStatistics> update) { + synchronized (this) { + Table originTable = getTable(dbName, tableName); + Map<String, String> originParams = originTable.getParameters(); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); + + Table newTable = originTable.deepCopy(); + Map<String, String> newParams = + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + newTable.setParameters(newParams); + List<Table> tableList = getTableList(dbName); + tableList.remove(originTable); + tableList.add(newTable); + } + } + + @Override + public void updatePartitionStatistics(String dbName, String tableName, String partitionName, Function<HivePartitionStatistics, HivePartitionStatistics> update) { + + synchronized (this) { + List<Partition> partitions = getPartitions(dbName, tableName, ImmutableList.of(partitionName)); + if (partitions.size() != 1) { + throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName); + } + + Partition originPartition = partitions.get(0); + Map<String, String> originParams = originPartition.getParameters(); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); + + Partition modifiedPartition = originPartition.deepCopy(); + Map<String, String> newParams = + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + modifiedPartition.setParameters(newParams); + + List<Partition> partitionList = getPartitionList(dbName, tableName); + partitionList.remove(originPartition); + partitionList.add(modifiedPartition); + } + } + + @Override + public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) { + synchronized (this) { + List<Partition> partitionList = getPartitionList(dbName, tableName); + List<Partition> hivePartitions = partitions.stream() + .map(HiveUtil::toMetastoreApiPartition) + .collect(Collectors.toList()); + partitionList.addAll(hivePartitions); + } + } + + @Override + public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) { + synchronized (this) { + List<Partition> partitionList = getPartitionList(dbName, tableName); + for (int j = 0; j < partitionList.size(); j++) { + Partition partition = partitionList.get(j); + if (partition.getValues().equals(partitionValues)) { + partitionList.remove(partition); + return; + } + } + throw new RuntimeException("can't found the partition"); + } + } + + public List<Partition> getPartitionList(String dbName, String tableName) { + HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tableName); + List<Partition> partitionList = this.partitions.get(key); + if (partitionList == null) { + throw new RuntimeException("can't found table: " + key); + } + return partitionList; + } + + public List<Table> getTableList(String dbName) { + List<Table> tablesList = this.tables.get(dbName); + if (tablesList == null) { + throw new RuntimeException("can't found database: " + dbName); + } + return tablesList; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index fc939625ea9..4ec6ca84c52 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -17,10 +17,10 @@ package org.apache.doris.datasource.hive; -import org.apache.doris.backup.Status; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; +import org.apache.doris.datasource.HMSCachedClientTest; +import org.apache.doris.fs.LocalDfsFileSystem; import org.apache.doris.thrift.THiveLocationParams; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TUpdateMode; import com.google.common.collect.Lists; import mockit.Mock; import mockit.MockUp; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.After; @@ -35,59 +36,57 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -@Ignore public class HmsCommitTest { - private static HMSExternalCatalog hmsCatalog; private static HiveMetadataOps hmsOps; private static HMSCachedClient hmsClient; private static final String dbName = "test_db"; private static final String tbWithPartition = "test_tb_with_partition"; private static final String tbWithoutPartition = "test_tb_without_partition"; - private static Path warehousePath; + private static LocalDfsFileSystem fs; static String dbLocation; - private String fileFormat = "orc"; + static String writeLocation; + static String uri = "thrift://127.0.0.1:9083"; + static boolean hasRealHmsService = false; @BeforeClass public static void beforeClass() throws Throwable { - warehousePath = Files.createTempDirectory("test_warehouse_"); + Path warehousePath = Files.createTempDirectory("test_warehouse_"); + Path writePath = Files.createTempDirectory("test_write_"); dbLocation = "file://" + warehousePath.toAbsolutePath() + "/"; + writeLocation = "file://" + writePath.toAbsolutePath() + "/"; createTestHiveCatalog(); createTestHiveDatabase(); - mockFs(); } @AfterClass public static void afterClass() { - hmsClient.dropTable(dbName, tbWithPartition); - hmsClient.dropTable(dbName, tbWithoutPartition); hmsClient.dropDatabase(dbName); } - public static void createTestHiveCatalog() { - Map<String, String> props = new HashMap<>(); - props.put("type", "hms"); - props.put("hive.metastore.uris", "thrift://127.0.0.1:9083"); - props.put("hadoop.username", "hadoop"); - hmsCatalog = new HMSExternalCatalog(1, "hive_catalog", null, props, "comment"); - hmsCatalog.setInitialized(); - hmsCatalog.initLocalObjectsImpl(); - hmsOps = (HiveMetadataOps) hmsCatalog.getMetadataOps(); - hmsClient = hmsOps.getClient(); + public static void createTestHiveCatalog() throws IOException { + fs = new LocalDfsFileSystem(); + + if (hasRealHmsService) { + // If you have a real HMS service, then you can use this client to create real connections for testing + HiveConf entries = new HiveConf(); + entries.set("hive.metastore.uris", uri); + hmsClient = new ThriftHMSCachedClient(entries, 2); + } else { + hmsClient = new HMSCachedClientTest(); + } + hmsOps = new HiveMetadataOps(null, hmsClient, fs); } public static void createTestHiveDatabase() { @@ -98,53 +97,31 @@ public class HmsCommitTest { hmsClient.createDatabase(dbMetadata); } - public static void mockFs() { - - new MockUp<DFSFileSystem>(DFSFileSystem.class) { - @Mock - public void asyncRenameDir(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - } - - @Mock - public void asyncRename(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - List<String> fileNames) { - } - - @Mock - public Status renameDir(String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - return Status.OK; - } - }; - } - @Before public void before() { - // create table + // create table for tbWithPartition List<Column> columns = new ArrayList<>(); columns.add(new Column("c1", PrimitiveType.INT, true)); columns.add(new Column("c2", PrimitiveType.STRING, true)); columns.add(new Column("c3", PrimitiveType.STRING, false)); List<String> partitionKeys = new ArrayList<>(); partitionKeys.add("c3"); + String fileFormat = "orc"; + HashMap<String, String> params = new HashMap<String, String>() {{ + put("location_uri", dbLocation + tbWithPartition); + }}; HiveTableMetadata tableMetadata = new HiveTableMetadata( dbName, tbWithPartition, columns, partitionKeys, - new HashMap<>(), fileFormat); + params, fileFormat); hmsClient.createTable(tableMetadata, true); + // create table for tbWithoutPartition + HashMap<String, String> params2 = new HashMap<String, String>() {{ + put("location_uri", dbLocation + tbWithPartition); + }}; HiveTableMetadata tableMetadata2 = new HiveTableMetadata( - dbName, tbWithoutPartition, columns, new ArrayList<>(), - new HashMap<>(), fileFormat); + dbName, tbWithoutPartition, columns, new ArrayList<>(), + params2, fileFormat); hmsClient.createTable(tableMetadata2, true); } @@ -156,45 +133,45 @@ public class HmsCommitTest { } @Test - public void testNewPartitionForUnPartitionedTable() { + public void testNewPartitionForUnPartitionedTable() throws IOException { List<THivePartitionUpdate> pus = new ArrayList<>(); - pus.add(createRandomNew("a")); + pus.add(createRandomNew(null)); Assert.assertThrows(Exception.class, () -> commit(dbName, tbWithoutPartition, pus)); } @Test - public void testAppendPartitionForUnPartitionedTable() { + public void testAppendPartitionForUnPartitionedTable() throws IOException { List<THivePartitionUpdate> pus = new ArrayList<>(); - pus.add(createRandomAppend("")); - pus.add(createRandomAppend("")); - pus.add(createRandomAppend("")); + pus.add(createRandomAppend(null)); + pus.add(createRandomAppend(null)); + pus.add(createRandomAppend(null)); commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(3, table); List<THivePartitionUpdate> pus2 = new ArrayList<>(); - pus2.add(createRandomAppend("")); - pus2.add(createRandomAppend("")); - pus2.add(createRandomAppend("")); + pus2.add(createRandomAppend(null)); + pus2.add(createRandomAppend(null)); + pus2.add(createRandomAppend(null)); commit(dbName, tbWithoutPartition, pus2); table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(6, table); } @Test - public void testOverwritePartitionForUnPartitionedTable() { + public void testOverwritePartitionForUnPartitionedTable() throws IOException { testAppendPartitionForUnPartitionedTable(); List<THivePartitionUpdate> pus = new ArrayList<>(); - pus.add(createRandomOverwrite("")); - pus.add(createRandomOverwrite("")); - pus.add(createRandomOverwrite("")); + pus.add(createRandomOverwrite(null)); + pus.add(createRandomOverwrite(null)); + pus.add(createRandomOverwrite(null)); commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(3, table); } @Test - public void testNewPartitionForPartitionedTable() { + public void testNewPartitionForPartitionedTable() throws IOException { List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomNew("a")); pus.add(createRandomNew("a")); @@ -213,7 +190,7 @@ public class HmsCommitTest { } @Test - public void testAppendPartitionForPartitionedTable() { + public void testAppendPartitionForPartitionedTable() throws IOException { testNewPartitionForPartitionedTable(); List<THivePartitionUpdate> pus = new ArrayList<>(); @@ -234,7 +211,7 @@ public class HmsCommitTest { } @Test - public void testOverwritePartitionForPartitionedTable() { + public void testOverwritePartitionForPartitionedTable() throws IOException { testAppendPartitionForPartitionedTable(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomOverwrite("a")); @@ -251,7 +228,7 @@ public class HmsCommitTest { } @Test - public void testNewManyPartitionForPartitionedTable() { + public void testNewManyPartitionForPartitionedTable() throws IOException { List<THivePartitionUpdate> pus = new ArrayList<>(); int nums = 150; for (int i = 0; i < nums; i++) { @@ -265,12 +242,30 @@ public class HmsCommitTest { } try { - commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, Collections.singletonList(createRandomNew("1"))); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("failed to add partitions")); } } + @Test + public void testErrorPartitionTypeFromHmsCheck() throws IOException { + // first add three partition: a,b,c + testNewPartitionForPartitionedTable(); + + // second append two partition: a,x + // but there is no 'x' partition in the previous table, so when verifying based on HMS, + // it will throw exception + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomAppend("a")); + pus.add(createRandomAppend("x")); + + Assert.assertThrows( + Exception.class, + () -> commit(dbName, tbWithPartition, pus) + ); + } + public void assertNumRows(long expected, Partition p) { Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows"))); } @@ -279,40 +274,62 @@ public class HmsCommitTest { Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows"))); } - public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) { + public THivePartitionUpdate genOnePartitionUpdate(TUpdateMode mode) throws IOException { + return genOnePartitionUpdate("", mode); + } + + public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) throws IOException { String uuid = UUID.randomUUID().toString(); THiveLocationParams location = new THiveLocationParams(); - String targetPath = dbLocation + uuid; + String targetPath = dbLocation + uuid + "/" + partitionValue; + location.setTargetPath(targetPath); - location.setWritePath(targetPath); + location.setWritePath(writeLocation + partitionValue); THivePartitionUpdate pu = new THivePartitionUpdate(); - pu.setName(partitionValue); + if (partitionValue != null) { + pu.setName(partitionValue); + } pu.setUpdateMode(mode); pu.setRowCount(1); pu.setFileSize(1); pu.setLocation(location); + String f1 = uuid + "f1"; + String f2 = uuid + "f2"; + String f3 = uuid + "f3"; + pu.setFileNames(new ArrayList<String>() { { - add(targetPath + "/f1"); - add(targetPath + "/f2"); - add(targetPath + "/f3"); + add(f1); + add(f2); + add(f3); } }); + + if (mode != TUpdateMode.NEW) { + fs.makeDir(targetPath); + } + + fs.createFile(writeLocation + partitionValue + "/" + f1); + fs.createFile(writeLocation + partitionValue + "/" + f2); + fs.createFile(writeLocation + partitionValue + "/" + f3); return pu; } - public THivePartitionUpdate createRandomNew(String partition) { - return genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW); + public THivePartitionUpdate createRandomNew(String partition) throws IOException { + return partition == null ? genOnePartitionUpdate(TUpdateMode.NEW) : + genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW); } - public THivePartitionUpdate createRandomAppend(String partition) { - return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND); + public THivePartitionUpdate createRandomAppend(String partition) throws IOException { + return partition == null ? genOnePartitionUpdate(TUpdateMode.APPEND) : + genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND); } - public THivePartitionUpdate createRandomOverwrite(String partition) { - return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE); + public THivePartitionUpdate createRandomOverwrite(String partition) throws IOException { + return partition == null ? genOnePartitionUpdate(TUpdateMode.OVERWRITE) : + genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE); } public void commit(String dbName, @@ -323,4 +340,181 @@ public class HmsCommitTest { hmsTransaction.finishInsertTable(dbName, tableName); hmsTransaction.commit(); } + + public void mockAddPartitionTaskException(Runnable runnable) { + new MockUp<HMSTransaction.AddPartitionsTask>(HMSTransaction.AddPartitionsTask.class) { + @Mock + private void run(HiveMetadataOps hiveOps) { + runnable.run(); + throw new RuntimeException("failed to add partition"); + } + }; + } + + public void mockDoOther(Runnable runnable) { + new MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) { + @Mock + private void doNothing() { + runnable.run(); + throw new RuntimeException("failed to do nothing"); + } + }; + } + + public void mockUpdateStatisticsTaskException(Runnable runnable) { + new MockUp<HMSTransaction.UpdateStatisticsTask>(HMSTransaction.UpdateStatisticsTask.class) { + @Mock + private void run(HiveMetadataOps hiveOps) { + runnable.run(); + throw new RuntimeException("failed to update partition"); + } + }; + } + + @Test + public void testRollbackNewPartitionForPartitionedTableForFilesystem() throws IOException { + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomNew("a")); + + THiveLocationParams location = pus.get(0).getLocation(); + + // For new partition, there should be no target path + Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + + mockAddPartitionTaskException(() -> { + // When the commit is completed, these files should be renamed successfully + String targetPath = location.getTargetPath(); + Assert.assertTrue(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertTrue(fs.exists(targetPath + "/" + file).ok()); + } + }); + + try { + commit(dbName, tbWithPartition, pus); + Assert.assertTrue(false); + } catch (Exception e) { + // ignore + } + + // After rollback, these files will be deleted + String targetPath = location.getTargetPath(); + Assert.assertFalse(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertFalse(fs.exists(targetPath + "/" + file).ok()); + } + } + + + @Test + public void testRollbackNewPartitionForPartitionedTableWithNewPartition() throws IOException { + // first create three partitions: a,b,c + testNewPartitionForPartitionedTable(); + + // second add 'new partition' for 'x' + // add 'append partition' for 'a' + // when 'doCommit', 'new partition' will be executed before 'append partition' + // so, when 'rollback', the 'x' partition will be added and then deleted + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomNew("x")); + pus.add(createRandomAppend("a")); + + THiveLocationParams location = pus.get(0).getLocation(); + + // For new partition, there should be no target path + Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + + mockUpdateStatisticsTaskException(() -> { + // When the commit is completed, these files should be renamed successfully + String targetPath = location.getTargetPath(); + Assert.assertTrue(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertTrue(fs.exists(targetPath + "/" + file).ok()); + } + // new partition will be executed before append partition, + // so, we can get the new partition + Partition px = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x")); + assertNumRows(1, px); + }); + + try { + commit(dbName, tbWithPartition, pus); + Assert.assertTrue(false); + } catch (Exception e) { + // ignore + } + + // After rollback, these files will be deleted + String targetPath = location.getTargetPath(); + Assert.assertFalse(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertFalse(fs.exists(targetPath + "/" + file).ok()); + } + // x partition will be deleted + Assert.assertThrows( + "the 'x' partition should be deleted", + Exception.class, + () -> hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x")) + ); + } + + @Test + public void testRollbackNewPartitionForPartitionedTableWithNewAppendPartition() throws IOException { + // first create three partitions: a,b,c + testNewPartitionForPartitionedTable(); + + // second add 'new partition' for 'x' + // add 'append partition' for 'a' + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomNew("x")); + pus.add(createRandomAppend("a")); + + THiveLocationParams location = pus.get(0).getLocation(); + + // For new partition, there should be no target path + Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + + mockDoOther(() -> { + // When the commit is completed, these files should be renamed successfully + String targetPath = location.getTargetPath(); + Assert.assertTrue(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertTrue(fs.exists(targetPath + "/" + file).ok()); + } + // new partition will be executed, + // so, we can get the new partition + Partition px = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x")); + assertNumRows(1, px); + // append partition will be executed, + // so, we can get the updated partition + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + assertNumRows(4, pa); + }); + + try { + commit(dbName, tbWithPartition, pus); + Assert.assertTrue(false); + } catch (Exception e) { + // ignore + } + + // After rollback, these files will be deleted + String targetPath = location.getTargetPath(); + Assert.assertFalse(fs.exists(targetPath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertFalse(fs.exists(targetPath + "/" + file).ok()); + } + // x partition will be deleted + Assert.assertThrows( + "the 'x' partition should be deleted", + Exception.class, + () -> hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x")) + ); + // the 'a' partition should be rollback + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + assertNumRows(3, pa); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org