This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new d40abdf896 [dev-1.1.2](cherry-pick) fix bug that hive external table 
can not query table created by Tez (#11603)
d40abdf896 is described below

commit d40abdf8965f93fe1ba2806abe68680141daf627
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Tue Aug 9 08:28:34 2022 +0800

    [dev-1.1.2](cherry-pick) fix bug that hive external table can not query 
table created by Tez (#11603)
---
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 207 ++++++++++++---------
 1 file changed, 123 insertions(+), 84 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index b49a7e5d7d..0d5bcf9671 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -36,10 +36,13 @@ import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExprOpcode;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -61,8 +64,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import com.google.common.base.Strings;
-
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -70,6 +71,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Stack;
 
 /**
@@ -162,7 +164,8 @@ public class HiveMetaStoreClientHelper {
     }
 
     /**
-     * Get data files of partitions in hive table, filter by partition 
predicate
+     * Get data files of partitions in hive table, filter by partition 
predicate.
+     *
      * @param hiveTable
      * @param hivePartitionPredicate
      * @param fileStatuses
@@ -171,34 +174,134 @@ public class HiveMetaStoreClientHelper {
      * @throws DdlException
      */
     public static String getHiveDataFiles(HiveTable hiveTable, 
ExprNodeGenericFuncDesc hivePartitionPredicate,
-                                          List<TBrokerFileStatus> 
fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) throws 
DdlException {
-        HiveMetaStoreClient client = 
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-
+            List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, 
StorageBackend.StorageType type)
+            throws DdlException {
+        boolean onS3 = type.equals(StorageBackend.StorageType.S3);
+        Map<String, String> properties = hiveTable.getHiveProperties();
+        Configuration configuration = getConfiguration(properties, onS3);
+        boolean isSecurityEnabled = isSecurityEnabled(properties);
+        String metaStoreUris = 
hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
+        
         List<RemoteIterator<LocatedFileStatus>> remoteIterators;
-        Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
         if (remoteHiveTbl.getPartitionKeys().size() > 0) {
             // hive partitioned table, get file iterator from table partition 
sd info
-            List<Partition> hivePartitions = new ArrayList<>();
-            try {
-                client.listPartitionsByExpr(hiveTable.getHiveDb(), 
hiveTable.getHiveTable(),
-                        
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, 
(short) -1, hivePartitions);
-            } catch (TException e) {
-                LOG.warn("Hive metastore thrift exception: {}", 
e.getMessage());
-                throw new DdlException("Connect hive metastore failed.");
-            } finally {
-                client.close();
-            }
-            remoteIterators = getRemoteIterator(hivePartitions, 
hiveTable.getHiveProperties(), onS3);
+            List<Partition> hivePartitions = getHivePartitions(metaStoreUris, 
remoteHiveTbl, hivePartitionPredicate);
+            remoteIterators = getRemoteIterator(hivePartitions, configuration, 
isSecurityEnabled, properties, onS3);
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl, 
hiveTable.getHiveProperties(), onS3);
+            remoteIterators = getRemoteIterator(remoteHiveTbl, configuration, 
isSecurityEnabled, properties, onS3);
+        }
+        return getAllFileStatus(fileStatuses, remoteIterators, configuration, 
isSecurityEnabled, properties, onS3);
+    }
+
+    /**
+     * list partitions from hiveMetaStore.
+     *
+     * @param metaStoreUris hiveMetaStore uris
+     * @param remoteHiveTbl Hive table
+     * @param hivePartitionPredicate filter when list partitions
+     * @return a list of hive partitions
+     * @throws DdlException when connect hiveMetaStore failed.
+     */
+    public static List<Partition> getHivePartitions(String metaStoreUris, 
Table remoteHiveTbl,
+            ExprNodeGenericFuncDesc hivePartitionPredicate) throws 
DdlException {
+        List<Partition> hivePartitions = new ArrayList<>();
+        HiveMetaStoreClient client = getClient(metaStoreUris);
+        try {
+            client.listPartitionsByExpr(remoteHiveTbl.getDbName(), 
remoteHiveTbl.getTableName(),
+                    
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, 
(short) -1,
+                    hivePartitions);
+        } catch (TException e) {
+            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
+            throw new DdlException("Connect hive metastore failed.");
+        } finally {
+            client.close();
+        }
+        return hivePartitions;
+    }
+
+    // create Configuration for the given properties
+    private static Configuration getConfiguration(Map<String, String> 
properties, boolean onS3) {
+        Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
+        return configuration;
+    }
+
+    // return true if it is kerberos
+    private static boolean isSecurityEnabled(Map<String, String> properties) {
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if 
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) && 
entry.getValue()
+                    .equals(AuthType.KERBEROS.getDesc())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    // Get remote iterators for given partitions
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(List<Partition> partitions,
+            Configuration configuration, boolean isSecurityEnabled, 
Map<String, String> properties, boolean onS3)
+            throws DdlException {
+        List<RemoteIterator<LocatedFileStatus>> allIterators = new 
ArrayList<>();
+        for (Partition p : partitions) {
+            String location = p.getSd().getLocation();
+            Path path = new Path(location);
+            allIterators.addAll(getRemoteIterator(path, configuration, 
properties, isSecurityEnabled));
+        }
+        return allIterators;
+    }
+
+    // Get remote iterators for given table
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(Table table, Configuration configuration,
+            boolean isSecurityEnabled, Map<String, String> properties, boolean 
onS3) throws DdlException {
+        String location = table.getSd().getLocation();
+        Path path = new Path(location);
+        return getRemoteIterator(path, configuration, properties, 
isSecurityEnabled);
+    }
+
+    // Get remote iterators for given Path
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(org.apache.hadoop.fs.Path path,
+            Configuration conf, Map<String, String> properties, boolean 
isSecurityEnabled) throws DdlException {
+        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
+        try {
+            if (isSecurityEnabled) {
+                UserGroupInformation.setConfiguration(conf);
+                // login user from keytab
+                
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                        properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
+            FileSystem fileSystem = path.getFileSystem(conf);
+            iterators.add(fileSystem.listLocatedStatus(path));
+        } catch (IOException e) {
+            LOG.warn("Get HDFS file remote iterator failed. {}" + 
e.getMessage());
+            throw new DdlException("Get HDFS file remote iterator failed. 
Error: " + e.getMessage());
         }
+        return iterators;
+    }
 
+    private static String getAllFileStatus(List<TBrokerFileStatus> 
fileStatuses,
+            List<RemoteIterator<LocatedFileStatus>> remoteIterators, 
Configuration configuration,
+            boolean isSecurityEnabled, Map<String, String> properties, boolean 
onS3) throws DdlException {
         String hdfsUrl = "";
-        for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) {
+        Queue<RemoteIterator<LocatedFileStatus>> queue = 
Queues.newArrayDeque(remoteIterators);
+        while (queue.peek() != null) {
+            RemoteIterator<LocatedFileStatus> iterator = queue.poll();
             try {
                 while (iterator.hasNext()) {
                     LocatedFileStatus fileStatus = iterator.next();
+                    if (fileStatus.isDirectory()) {
+                        // recursive visit the directory to get the file path.
+                        queue.addAll(
+                                getRemoteIterator(fileStatus.getPath(), 
configuration, properties, isSecurityEnabled));
+                        continue;
+                    }
                     TBrokerFileStatus brokerFileStatus = new 
TBrokerFileStatus();
                     brokerFileStatus.setIsDir(fileStatus.isDirectory());
                     brokerFileStatus.setIsSplitable(true);
@@ -226,7 +329,6 @@ public class HiveMetaStoreClientHelper {
                 throw new DdlException("List HDFS file failed.");
             }
         }
-
         return hdfsUrl;
     }
 
@@ -245,69 +347,6 @@ public class HiveMetaStoreClientHelper {
         configuration.set("fs.s3a.attempts.maximum", "2");
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
-            List<Partition> partitions, Map<String, String> properties, 
boolean onS3)
-            throws DdlException {
-        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
-                configuration.set(entry.getKey(), entry.getValue());
-            }
-        }
-        if (onS3) {
-            setS3Configuration(configuration, properties);
-        }
-        for (Partition p : partitions) {
-            String location = p.getSd().getLocation();
-            org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
-            try {
-                FileSystem fileSystem = path.getFileSystem(configuration);
-                iterators.add(fileSystem.listLocatedStatus(path));
-            } catch (IOException e) {
-                LOG.warn("Get HDFS file remote iterator failed. {}", 
e.getMessage());
-                throw new DdlException("Get HDFS file remote iterator 
failed.");
-            }
-        }
-        return iterators;
-    }
-
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
-            Table table, Map<String, String> properties, boolean onS3)
-            throws DdlException {
-        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
-        Configuration configuration = new Configuration(false);
-        boolean isSecurityEnabled = false;
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
-                configuration.set(entry.getKey(), entry.getValue());
-            }
-            if 
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
-                && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
-                isSecurityEnabled = true;
-            }
-        }
-        if (onS3) {
-            setS3Configuration(configuration, properties);
-        }
-        String location = table.getSd().getLocation();
-        org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
-        try {
-            if (isSecurityEnabled) {
-                UserGroupInformation.setConfiguration(configuration);
-                // login user from keytab
-                
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
-                    properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
-            }
-            FileSystem fileSystem = path.getFileSystem(configuration);
-            iterators.add(fileSystem.listLocatedStatus(path));
-        } catch (IOException e) {
-            LOG.warn("Get HDFS file remote iterator failed. {}" + 
e.getMessage());
-            throw new DdlException("Get HDFS file remote iterator failed.");
-        }
-        return iterators;
-    }
-
     public static List<String> getPartitionNames(HiveTable hiveTable) throws 
DdlException {
         HiveMetaStoreClient client = 
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
         List<String> partitionNames = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to