This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 923fca6e1a0fd1a340841d9e4986a20d77468d13
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Mon Apr 3 13:54:28 2023 +0800

    [Improvement](multi catalog)Change hive metastore cache split value type to 
Doris defined Split. Fix split file length -1 bug (#18319)
    
    HiveMetastoreCache type for file split was Hadoop InputSplit. In this pr, 
change it to Doris defined Split
    This change could avoid convert it every time.
    Also fix the explain verbose result return -1 for split file length.
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 40 +++++++++++++---------
 .../planner/external/ExternalFileScanNode.java     |  6 ++--
 .../doris/planner/external/HiveSplitter.java       | 28 ++++++---------
 .../apache/doris/datasource/CatalogMgrTest.java    |  4 +--
 4 files changed, 39 insertions(+), 39 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d963b52c7f..548e06d65b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.planner.external.HiveSplitter;
 
 import com.google.common.base.Preconditions;
@@ -96,7 +97,7 @@ public class HiveMetaStoreCache {
     // cache from <dbname-tblname-partition_values> -> <partition info>
     private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
     // the ref of cache from <location> -> <file list>
-    private volatile AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<InputSplit>>> fileCacheRef
+    private volatile AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<FileSplit>>> fileCacheRef
             = new AtomicReference<>();
 
     public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
@@ -147,10 +148,10 @@ public class HiveMetaStoreCache {
         }
         // if the file.meta.cache.ttl-second is equal 0, use the synchronous 
loader
         // if the file.meta.cache.ttl-second greater than 0, use the 
asynchronous loader
-        CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader = 
getGuavaCacheLoader(executor,
+        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader = 
getGuavaCacheLoader(executor,
                 fileMetaCacheTtlSecond);
 
-        LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = 
fileCacheRef.get();
+        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = 
fileCacheRef.get();
 
         fileCacheRef.set(fileCacheBuilder.build(loader));
         if (Objects.nonNull(preFileCache)) {
@@ -261,7 +262,7 @@ public class HiveMetaStoreCache {
         return new HivePartition(sd.getInputFormat(), sd.getLocation(), 
key.values);
     }
 
-    private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
+    private ImmutableList<FileSplit> loadFiles(FileCacheKey key) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -278,12 +279,13 @@ public class HiveMetaStoreCache {
             jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
             FileInputFormat.setInputPaths(jobConf, finalLocation);
             try {
+                FileSplit[] result;
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
-                InputSplit[] splits;
                 // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    splits = HiveSplitter.getHiveSplits(new 
Path(finalLocation), inputFormat, jobConf);
+                    result = HiveSplitter.getHiveSplits(new 
Path(finalLocation), inputFormat, jobConf);
                 } else {
+                    InputSplit[] splits;
                     String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
                     if (!Strings.isNullOrEmpty(remoteUser)) {
                         UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
@@ -292,12 +294,18 @@ public class HiveMetaStoreCache {
                     } else {
                         splits = inputFormat.getSplits(jobConf, 0 /* use hdfs 
block size as default */);
                     }
+                    result = new FileSplit[splits.length];
+                    // Convert the hadoop split to Doris Split.
+                    for (int i = 0; i < splits.length; i++) {
+                        org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
+                        result[i] =  new FileSplit(fs.getPath(), 
fs.getStart(), fs.getLength(), -1, null);
+                    }
                 }
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("load #{} files for {} in catalog {}", 
splits.length, key, catalog.getName());
+                    LOG.debug("load #{} splits for {} in catalog {}", 
result.length, key, catalog.getName());
                 }
-                return ImmutableList.copyOf(splits);
+                return ImmutableList.copyOf(result);
             } catch (Exception e) {
                 throw new CacheException("failed to get input splits for %s in 
catalog %s", e, key, catalog.getName());
             }
@@ -345,7 +353,7 @@ public class HiveMetaStoreCache {
         }
     }
 
-    public List<InputSplit> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
+    public List<FileSplit> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitions.size());
         partitions.stream().forEach(p -> keys.add(new 
FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
@@ -356,14 +364,14 @@ public class HiveMetaStoreCache {
         } else {
             stream = keys.parallelStream();
         }
-        List<ImmutableList<InputSplit>> fileLists = stream.map(k -> {
+        List<ImmutableList<FileSplit>> fileLists = stream.map(k -> {
             try {
                 return fileCacheRef.get().get(k);
             } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         }).collect(Collectors.toList());
-        List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize(
+        List<FileSplit> retFiles = Lists.newArrayListWithExpectedSize(
                 fileLists.stream().mapToInt(l -> l.size()).sum());
         fileLists.stream().forEach(l -> retFiles.addAll(l));
         LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} 
ms",
@@ -574,12 +582,12 @@ public class HiveMetaStoreCache {
      * @param fileMetaCacheTtlSecond
      * @return
      */
-    private CacheLoader<FileCacheKey, ImmutableList<InputSplit>> 
getGuavaCacheLoader(Executor executor,
+    private CacheLoader<FileCacheKey, ImmutableList<FileSplit>> 
getGuavaCacheLoader(Executor executor,
             int fileMetaCacheTtlSecond) {
-        CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader =
-                new CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() {
+        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader =
+                new CacheLoader<FileCacheKey, ImmutableList<FileSplit>>() {
                     @Override
-                    public ImmutableList<InputSplit> load(FileCacheKey key) 
throws Exception {
+                    public ImmutableList<FileSplit> load(FileCacheKey key) 
throws Exception {
                         return loadFiles(key);
                     }
                 };
@@ -594,7 +602,7 @@ public class HiveMetaStoreCache {
      * get fileCache ref
      * @return
      */
-    public AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<InputSplit>>> getFileCacheRef() {
+    public AtomicReference<LoadingCache<FileCacheKey, 
ImmutableList<FileSplit>>> getFileCacheRef() {
         return fileCacheRef;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index e22025800d..6c6089b16c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -758,7 +758,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     for (TFileRangeDesc file : fileRangeDescs) {
                         output.append(prefix).append("    
").append(file.getPath())
                                 .append(" start: 
").append(file.getStartOffset())
-                                .append(" length: ").append(file.getFileSize())
+                                .append(" length: ").append(file.getSize())
                                 .append("\n");
                     }
                 } else {
@@ -766,7 +766,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                         TFileRangeDesc file = fileRangeDescs.get(i);
                         output.append(prefix).append("    
").append(file.getPath())
                                 .append(" start: 
").append(file.getStartOffset())
-                                .append(" length: ").append(file.getFileSize())
+                                .append(" length: ").append(file.getSize())
                                 .append("\n");
                     }
                     int other = size - 4;
@@ -774,7 +774,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     TFileRangeDesc file = fileRangeDescs.get(size - 1);
                     output.append(prefix).append("    ").append(file.getPath())
                             .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getFileSize())
+                            .append(" length: ").append(file.getSize())
                             .append("\n");
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index f7f09b6da6..9c8dec303b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -42,9 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -138,20 +136,14 @@ public class HiveSplitter implements Splitter {
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
                                           List<Split> allFiles, boolean 
useSelfSplitter) {
-        List<InputSplit> files = cache.getFilesByPartitions(partitions, 
useSelfSplitter);
+        List<FileSplit> files = cache.getFilesByPartitions(partitions, 
useSelfSplitter);
         if (LOG.isDebugEnabled()) {
             LOG.debug("get #{} files from #{} partitions: {}", files.size(), 
partitions.size(),
                     Joiner.on(",")
                     .join(files.stream().limit(10).map(f -> ((FileSplit) 
f).getPath())
                         .collect(Collectors.toList())));
         }
-        allFiles.addAll(files.stream().map(file -> {
-            FileSplit fs = (FileSplit) file;
-            org.apache.doris.planner.external.FileSplit split = new 
org.apache.doris.planner.external.FileSplit(
-                    fs.getPath(), fs.getStart(), fs.getLength(), -1, null
-            );
-            return split;
-        }).collect(Collectors.toList()));
+        allFiles.addAll(files);
     }
 
     public int getTotalPartitionNum() {
@@ -163,24 +155,24 @@ public class HiveSplitter implements Splitter {
     }
 
     // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 
like storage system.
-    public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?> 
inputFormat,
+    public static FileSplit[] getHiveSplits(Path path, InputFormat<?, ?> 
inputFormat,
                                              JobConf jobConf) throws 
IOException {
         FileSystem fs = path.getFileSystem(jobConf);
         boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
-        List<InputSplit> splits = Lists.newArrayList();
+        List<FileSplit> splits = Lists.newArrayList();
         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = 
fs.listFiles(path, true);
         if (!locatedFileStatusRemoteIterator.hasNext()) {
             LOG.debug("File status for path {} is empty.", path);
-            return new InputSplit[0];
+            return new FileSplit[0];
         }
         if (!splittable) {
             LOG.debug("Path {} is not splittable.", path);
             while (locatedFileStatusRemoteIterator.hasNext()) {
                 LocatedFileStatus status = 
locatedFileStatusRemoteIterator.next();
                 BlockLocation block = status.getBlockLocations()[0];
-                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), 
block.getHosts()));
+                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), 
status.getLen(), block.getHosts()));
             }
-            return splits.toArray(new InputSplit[splits.size()]);
+            return splits.toArray(new FileSplit[splits.size()]);
         }
         long splitSize = Config.file_split_size;
         boolean useDefaultBlockSize = (splitSize <= 0);
@@ -196,17 +188,17 @@ public class HiveSplitter implements Splitter {
                     bytesRemaining -= splitSize) {
                 int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
                 splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
-                        splitSize, blockLocations[location].getHosts()));
+                        splitSize, length, 
blockLocations[location].getHosts()));
             }
             if (bytesRemaining != 0L) {
                 int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
                 splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
-                        bytesRemaining, blockLocations[location].getHosts()));
+                        bytesRemaining, length, 
blockLocations[location].getHosts()));
             }
         }
 
         LOG.debug("Path {} includes {} splits.", path, splits.size());
-        return splits.toArray(new InputSplit[splits.size()]);
+        return splits.toArray(new FileSplit[splits.size()]);
     }
 
     private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index c93be4a3ac..67416619b0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -54,6 +54,7 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
@@ -66,7 +67,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
-import org.apache.hadoop.mapred.InputSplit;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -678,7 +678,7 @@ public class CatalogMgrTest extends TestWithFeService {
 
         HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog(catalogName);
         HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
-        LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = 
metaStoreCache.getFileCacheRef().get();
+        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = 
metaStoreCache.getFileCacheRef().get();
 
 
         // 1. properties contains `file.meta.cache.ttl-second`, it should not 
be equal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to