This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 097bd53f3993cc501b1b20d975d8ee8c2199f2cb
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Fri Apr 7 00:07:23 2023 +0800

    [Improvement](multi catalog)Cache File for Hive Table, instead of cache 
file splits. (#18419)
    
    Currently, the session variable for Split size will not take effect after 
the file splits are cached.
    1. This PR is to cache file for Hive Table, instead of cache file splits. 
And split the file every time using the current split size.
    2. Use self splitter by default.
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  85 +++++++++++-----
 .../doris/planner/external/HiveSplitter.java       | 110 +++++++++++----------
 .../apache/doris/datasource/CatalogMgrTest.java    |   4 +-
 3 files changed, 118 insertions(+), 81 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 548e06d65b..39532a6785 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.Split;
 import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.planner.external.HiveSplitter;
 
@@ -44,7 +45,6 @@ import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -53,6 +53,8 @@ import com.google.common.collect.TreeRangeMap;
 import lombok.Data;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -97,7 +99,7 @@ public class HiveMetaStoreCache {
     // cache from <dbname-tblname-partition_values> -> <partition info>
     private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
     // the ref of cache from <location> -> <file list>
-    private volatile AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<FileSplit>>> fileCacheRef
+    private volatile AtomicReference<LoadingCache<FileCacheKey, 
FileCacheValue>> fileCacheRef
             = new AtomicReference<>();
 
     public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
@@ -148,10 +150,10 @@ public class HiveMetaStoreCache {
         }
         // if the file.meta.cache.ttl-second is equal 0, use the synchronous 
loader
         // if the file.meta.cache.ttl-second greater than 0, use the 
asynchronous loader
-        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader = 
getGuavaCacheLoader(executor,
+        CacheLoader<FileCacheKey, FileCacheValue> loader = 
getGuavaCacheLoader(executor,
                 fileMetaCacheTtlSecond);
 
-        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = 
fileCacheRef.get();
+        LoadingCache<FileCacheKey, FileCacheValue> preFileCache = 
fileCacheRef.get();
 
         fileCacheRef.set(fileCacheBuilder.build(loader));
         if (Objects.nonNull(preFileCache)) {
@@ -262,7 +264,7 @@ public class HiveMetaStoreCache {
         return new HivePartition(sd.getInputFormat(), sd.getLocation(), 
key.values);
     }
 
-    private ImmutableList<FileSplit> loadFiles(FileCacheKey key) {
+    private FileCacheValue loadFiles(FileCacheKey key) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -279,11 +281,11 @@ public class HiveMetaStoreCache {
             jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
             FileInputFormat.setInputPaths(jobConf, finalLocation);
             try {
-                FileSplit[] result;
+                FileCacheValue result;
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    result = HiveSplitter.getHiveSplits(new 
Path(finalLocation), inputFormat, jobConf);
+                    result = HiveSplitter.getFileCache(new 
Path(finalLocation), inputFormat, jobConf);
                 } else {
                     InputSplit[] splits;
                     String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -294,18 +296,18 @@ public class HiveMetaStoreCache {
                     } else {
                         splits = inputFormat.getSplits(jobConf, 0 /* use hdfs 
block size as default */);
                     }
-                    result = new FileSplit[splits.length];
+                    result = new FileCacheValue();
                     // Convert the hadoop split to Doris Split.
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
-                        result[i] =  new FileSplit(fs.getPath(), 
fs.getStart(), fs.getLength(), -1, null);
+                        result.addSplit(new FileSplit(fs.getPath(), 
fs.getStart(), fs.getLength(), -1, null));
                     }
                 }
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("load #{} splits for {} in catalog {}", 
result.length, key, catalog.getName());
+                    LOG.debug("load #{} splits for {} in catalog {}", 
result.getFiles().size(), key, catalog.getName());
                 }
-                return ImmutableList.copyOf(result);
+                return result;
             } catch (Exception e) {
                 throw new CacheException("failed to get input splits for %s in 
catalog %s", e, key, catalog.getName());
             }
@@ -353,7 +355,7 @@ public class HiveMetaStoreCache {
         }
     }
 
-    public List<FileSplit> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
+    public List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitions.size());
         partitions.stream().forEach(p -> keys.add(new 
FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
@@ -364,19 +366,18 @@ public class HiveMetaStoreCache {
         } else {
             stream = keys.parallelStream();
         }
-        List<ImmutableList<FileSplit>> fileLists = stream.map(k -> {
+        List<FileCacheValue> fileLists = stream.map(k -> {
             try {
                 return fileCacheRef.get().get(k);
             } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         }).collect(Collectors.toList());
-        List<FileSplit> retFiles = Lists.newArrayListWithExpectedSize(
-                fileLists.stream().mapToInt(l -> l.size()).sum());
-        fileLists.stream().forEach(l -> retFiles.addAll(l));
         LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} 
ms",
-                retFiles.size(), partitions.size(), catalog.getName(), 
(System.currentTimeMillis() - start));
-        return retFiles;
+                fileLists.stream().mapToInt(l -> l.getFiles() == null
+                    ? (l.getSplits() == null ? 0 : l.getSplits().size()) : 
l.getFiles().size()).sum(),
+                partitions.size(), catalog.getName(), 
(System.currentTimeMillis() - start));
+        return fileLists;
     }
 
     public List<HivePartition> getAllPartitions(String dbName, String name, 
List<List<String>> partitionValuesList) {
@@ -582,12 +583,12 @@ public class HiveMetaStoreCache {
      * @param fileMetaCacheTtlSecond
      * @return
      */
-    private CacheLoader<FileCacheKey, ImmutableList<FileSplit>> 
getGuavaCacheLoader(Executor executor,
+    private CacheLoader<FileCacheKey, FileCacheValue> 
getGuavaCacheLoader(Executor executor,
             int fileMetaCacheTtlSecond) {
-        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader =
-                new CacheLoader<FileCacheKey, ImmutableList<FileSplit>>() {
+        CacheLoader<FileCacheKey, FileCacheValue> loader =
+                new CacheLoader<FileCacheKey, FileCacheValue>() {
                     @Override
-                    public ImmutableList<FileSplit> load(FileCacheKey key) 
throws Exception {
+                    public FileCacheValue load(FileCacheKey key) throws 
Exception {
                         return loadFiles(key);
                     }
                 };
@@ -602,7 +603,7 @@ public class HiveMetaStoreCache {
      * get fileCache ref
      * @return
      */
-    public AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<FileSplit>>> getFileCacheRef() {
+    public AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> 
getFileCacheRef() {
         return fileCacheRef;
     }
 
@@ -694,7 +695,7 @@ public class HiveMetaStoreCache {
         public FileCacheKey(String location, String inputFormat) {
             this.location = location;
             this.inputFormat = inputFormat;
-            this.useSelfSplitter = false;
+            this.useSelfSplitter = true;
         }
 
         public FileCacheKey(String location, String inputFormat, boolean 
useSelfSplitter) {
@@ -725,6 +726,42 @@ public class HiveMetaStoreCache {
         }
     }
 
+    @Data
+    public static class FileCacheValue {
+        // File Cache for self splitter.
+        private List<HiveFileStatus> files;
+        // File split cache for old splitter. This is a temp variable.
+        private List<Split> splits;
+        private boolean isSplittable;
+
+        public void addFile(LocatedFileStatus file) {
+            if (files == null) {
+                files = Lists.newArrayList();
+            }
+            HiveFileStatus status = new HiveFileStatus();
+            status.setBlockLocations(file.getBlockLocations());
+            status.setPath(file.getPath());
+            status.length = file.getLen();
+            status.blockSize = file.getBlockSize();
+            files.add(status);
+        }
+
+        public void addSplit(Split split) {
+            if (splits == null) {
+                splits = Lists.newArrayList();
+            }
+            splits.add(split);
+        }
+    }
+
+    @Data
+    public static class HiveFileStatus {
+        BlockLocation[] blockLocations;
+        Path path;
+        long length;
+        long blockSize;
+    }
+
     @Data
     public static class HivePartitionValues {
         private long nextPartitionId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index 5a3af95c6d..b17704251a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -35,7 +35,6 @@ import org.apache.doris.planner.Split;
 import org.apache.doris.planner.Splitter;
 import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +50,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class HiveSplitter implements Splitter {
 
@@ -81,11 +79,11 @@ public class HiveSplitter implements Splitter {
                     partitionColumnTypes);
             }
             Map<String, String> properties = 
hmsTable.getCatalog().getCatalogProperty().getProperties();
-            boolean useSelfSplitter = false;
+            boolean useSelfSplitter = true;
             if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
-                    && 
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true"))
 {
+                    && 
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false"))
 {
                 LOG.debug("Using self splitter for hmsTable {}", 
hmsTable.getName());
-                useSelfSplitter = true;
+                useSelfSplitter = false;
             }
 
             List<Split> allFiles = Lists.newArrayList();
@@ -135,15 +133,53 @@ public class HiveSplitter implements Splitter {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-                                          List<Split> allFiles, boolean 
useSelfSplitter) {
-        List<FileSplit> files = cache.getFilesByPartitions(partitions, 
useSelfSplitter);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("get #{} files from #{} partitions: {}", files.size(), 
partitions.size(),
-                    Joiner.on(",")
-                    .join(files.stream().limit(10).map(f -> ((FileSplit) 
f).getPath())
-                        .collect(Collectors.toList())));
+                                          List<Split> allFiles, boolean 
useSelfSplitter) throws IOException {
+        for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
+                cache.getFilesByPartitions(partitions, useSelfSplitter)) {
+            if (fileCacheValue.getSplits() != null) {
+                allFiles.addAll(fileCacheValue.getSplits());
+            }
+            if (fileCacheValue.getFiles() != null) {
+                boolean isSplittable = fileCacheValue.isSplittable();
+                for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
+                    allFiles.addAll(splitFile(status, isSplittable));
+                }
+            }
         }
-        allFiles.addAll(files);
+    }
+
+    private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status, 
boolean splittable) throws IOException {
+        List<Split> result = Lists.newArrayList();
+        if (!splittable) {
+            LOG.debug("Path {} is not splittable.", status.getPath());
+            BlockLocation block = status.getBlockLocations()[0];
+            result.add(new FileSplit(status.getPath(), 0, status.getLength(),
+                    status.getLength(), block.getHosts()));
+            return result;
+        }
+        long splitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
+        if (splitSize <= 0) {
+            splitSize = status.getBlockSize();
+        }
+        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+        splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : 
DEFAULT_SPLIT_SIZE;
+        BlockLocation[] blockLocations = status.getBlockLocations();
+        long length = status.getLength();
+        long bytesRemaining;
+        for (bytesRemaining = length; (double) bytesRemaining / (double) 
splitSize > 1.1D;
+                bytesRemaining -= splitSize) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
+                    splitSize, length, blockLocations[location].getHosts()));
+        }
+        if (bytesRemaining != 0L) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
+                    bytesRemaining, length, 
blockLocations[location].getHosts()));
+        }
+
+        LOG.debug("Path {} includes {} splits.", status.getPath(), 
result.size());
+        return result;
     }
 
     public int getTotalPartitionNum() {
@@ -154,52 +190,18 @@ public class HiveSplitter implements Splitter {
         return readPartitionNum;
     }
 
-    // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 
like storage system.
-    public static FileSplit[] getHiveSplits(Path path, InputFormat<?, ?> 
inputFormat,
-                                             JobConf jobConf) throws 
IOException {
+    // Get File Status by using FileSystem API.
+    public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, 
InputFormat<?, ?> inputFormat,
+                                                                  JobConf 
jobConf) throws IOException {
         FileSystem fs = path.getFileSystem(jobConf);
         boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
-        List<FileSplit> splits = Lists.newArrayList();
         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = 
fs.listFiles(path, true);
-        if (!locatedFileStatusRemoteIterator.hasNext()) {
-            LOG.debug("File status for path {} is empty.", path);
-            return new FileSplit[0];
-        }
-        if (!splittable) {
-            LOG.debug("Path {} is not splittable.", path);
-            while (locatedFileStatusRemoteIterator.hasNext()) {
-                LocatedFileStatus status = 
locatedFileStatusRemoteIterator.next();
-                BlockLocation block = status.getBlockLocations()[0];
-                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), 
status.getLen(), block.getHosts()));
-            }
-            return splits.toArray(new FileSplit[splits.size()]);
-        }
-        long splitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
+        HiveMetaStoreCache.FileCacheValue result = new 
HiveMetaStoreCache.FileCacheValue();
+        result.setSplittable(splittable);
         while (locatedFileStatusRemoteIterator.hasNext()) {
-            LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
-            if (splitSize <= 0) {
-                splitSize = status.getBlockSize();
-            }
-            // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-            splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : 
DEFAULT_SPLIT_SIZE;
-            BlockLocation[] blockLocations = status.getBlockLocations();
-            long length = status.getLen();
-            long bytesRemaining;
-            for (bytesRemaining = length; (double) bytesRemaining / (double) 
splitSize > 1.1D;
-                    bytesRemaining -= splitSize) {
-                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-                splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
-                        splitSize, length, 
blockLocations[location].getHosts()));
-            }
-            if (bytesRemaining != 0L) {
-                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-                splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
-                        bytesRemaining, length, 
blockLocations[location].getHosts()));
-            }
+            result.addFile(locatedFileStatusRemoteIterator.next());
         }
-
-        LOG.debug("Path {} includes {} splits.", path, splits.size());
-        return splits.toArray(new FileSplit[splits.size()]);
+        return result;
     }
 
     private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index 67416619b0..10692a5f68 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -54,7 +54,6 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
-import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
@@ -62,7 +61,6 @@ import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -678,7 +676,7 @@ public class CatalogMgrTest extends TestWithFeService {
 
         HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog(catalogName);
         HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
-        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = 
metaStoreCache.getFileCacheRef().get();
+        LoadingCache<FileCacheKey, HiveMetaStoreCache.FileCacheValue> 
preFileCache = metaStoreCache.getFileCacheRef().get();
 
 
         // 1. properties contains `file.meta.cache.ttl-second`, it should not 
be equal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to