This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new d40abdf896 [dev-1.1.2](cherry-pick) fix bug that hive external table can not query table created by Tez (#11603) d40abdf896 is described below commit d40abdf8965f93fe1ba2806abe68680141daf627 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Aug 9 08:28:34 2022 +0800 [dev-1.1.2](cherry-pick) fix bug that hive external table can not query table created by Tez (#11603) --- .../doris/catalog/HiveMetaStoreClientHelper.java | 207 ++++++++++++--------- 1 file changed, 123 insertions(+), 84 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index b49a7e5d7d..0d5bcf9671 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -36,10 +36,13 @@ import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExprOpcode; +import com.google.common.base.Strings; +import com.google.common.collect.Queues; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -61,8 +64,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import com.google.common.base.Strings; - import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -70,6 +71,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Stack; /** @@ -162,7 +164,8 @@ public class HiveMetaStoreClientHelper { } /** - * Get data files of partitions in hive table, filter by partition predicate + * Get data files of partitions in hive table, filter by partition predicate. + * * @param hiveTable * @param hivePartitionPredicate * @param fileStatuses @@ -171,34 +174,134 @@ public class HiveMetaStoreClientHelper { * @throws DdlException */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) throws DdlException { - HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); - + List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) + throws DdlException { + boolean onS3 = type.equals(StorageBackend.StorageType.S3); + Map<String, String> properties = hiveTable.getHiveProperties(); + Configuration configuration = getConfiguration(properties, onS3); + boolean isSecurityEnabled = isSecurityEnabled(properties); + String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS); + List<RemoteIterator<LocatedFileStatus>> remoteIterators; - Boolean onS3 = type.equals(StorageBackend.StorageType.S3); if (remoteHiveTbl.getPartitionKeys().size() > 0) { // hive partitioned table, get file iterator from table partition sd info - List<Partition> hivePartitions = new ArrayList<>(); - try { - client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(), - SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed."); - } finally { - client.close(); - } - remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties(), onS3); + List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate); + remoteIterators = getRemoteIterator(hivePartitions, configuration, isSecurityEnabled, properties, onS3); } else { // hive non-partitioned table, get file iterator from table sd info - remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties(), onS3); + remoteIterators = getRemoteIterator(remoteHiveTbl, configuration, isSecurityEnabled, properties, onS3); + } + return getAllFileStatus(fileStatuses, remoteIterators, configuration, isSecurityEnabled, properties, onS3); + } + + /** + * list partitions from hiveMetaStore. + * + * @param metaStoreUris hiveMetaStore uris + * @param remoteHiveTbl Hive table + * @param hivePartitionPredicate filter when list partitions + * @return a list of hive partitions + * @throws DdlException when connect hiveMetaStore failed. + */ + public static List<Partition> getHivePartitions(String metaStoreUris, Table remoteHiveTbl, + ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { + List<Partition> hivePartitions = new ArrayList<>(); + HiveMetaStoreClient client = getClient(metaStoreUris); + try { + client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(), + SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, + hivePartitions); + } catch (TException e) { + LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); + throw new DdlException("Connect hive metastore failed."); + } finally { + client.close(); + } + return hivePartitions; + } + + // create Configuration for the given properties + private static Configuration getConfiguration(Map<String, String> properties, boolean onS3) { + Configuration configuration = new Configuration(false); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { + configuration.set(entry.getKey(), entry.getValue()); + } + } + if (onS3) { + setS3Configuration(configuration, properties); + } + return configuration; + } + + // return true if it is kerberos + private static boolean isSecurityEnabled(Map<String, String> properties) { + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) && entry.getValue() + .equals(AuthType.KERBEROS.getDesc())) { + return true; + } + } + return false; + } + + // Get remote iterators for given partitions + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, + Configuration configuration, boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) + throws DdlException { + List<RemoteIterator<LocatedFileStatus>> allIterators = new ArrayList<>(); + for (Partition p : partitions) { + String location = p.getSd().getLocation(); + Path path = new Path(location); + allIterators.addAll(getRemoteIterator(path, configuration, properties, isSecurityEnabled)); + } + return allIterators; + } + + // Get remote iterators for given table + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Configuration configuration, + boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException { + String location = table.getSd().getLocation(); + Path path = new Path(location); + return getRemoteIterator(path, configuration, properties, isSecurityEnabled); + } + + // Get remote iterators for given Path + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(org.apache.hadoop.fs.Path path, + Configuration conf, Map<String, String> properties, boolean isSecurityEnabled) throws DdlException { + List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); + try { + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(conf); + // login user from keytab + UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), + properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); + } + FileSystem fileSystem = path.getFileSystem(conf); + iterators.add(fileSystem.listLocatedStatus(path)); + } catch (IOException e) { + LOG.warn("Get HDFS file remote iterator failed. {}" + e.getMessage()); + throw new DdlException("Get HDFS file remote iterator failed. Error: " + e.getMessage()); } + return iterators; + } + private static String getAllFileStatus(List<TBrokerFileStatus> fileStatuses, + List<RemoteIterator<LocatedFileStatus>> remoteIterators, Configuration configuration, + boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException { String hdfsUrl = ""; - for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) { + Queue<RemoteIterator<LocatedFileStatus>> queue = Queues.newArrayDeque(remoteIterators); + while (queue.peek() != null) { + RemoteIterator<LocatedFileStatus> iterator = queue.poll(); try { while (iterator.hasNext()) { LocatedFileStatus fileStatus = iterator.next(); + if (fileStatus.isDirectory()) { + // recursive visit the directory to get the file path. + queue.addAll( + getRemoteIterator(fileStatus.getPath(), configuration, properties, isSecurityEnabled)); + continue; + } TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus(); brokerFileStatus.setIsDir(fileStatus.isDirectory()); brokerFileStatus.setIsSplitable(true); @@ -226,7 +329,6 @@ public class HiveMetaStoreClientHelper { throw new DdlException("List HDFS file failed."); } } - return hdfsUrl; } @@ -245,69 +347,6 @@ public class HiveMetaStoreClientHelper { configuration.set("fs.s3a.attempts.maximum", "2"); } - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( - List<Partition> partitions, Map<String, String> properties, boolean onS3) - throws DdlException { - List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); - Configuration configuration = new Configuration(false); - for (Map.Entry<String, String> entry : properties.entrySet()) { - if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { - configuration.set(entry.getKey(), entry.getValue()); - } - } - if (onS3) { - setS3Configuration(configuration, properties); - } - for (Partition p : partitions) { - String location = p.getSd().getLocation(); - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); - try { - FileSystem fileSystem = path.getFileSystem(configuration); - iterators.add(fileSystem.listLocatedStatus(path)); - } catch (IOException e) { - LOG.warn("Get HDFS file remote iterator failed. {}", e.getMessage()); - throw new DdlException("Get HDFS file remote iterator failed."); - } - } - return iterators; - } - - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( - Table table, Map<String, String> properties, boolean onS3) - throws DdlException { - List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); - Configuration configuration = new Configuration(false); - boolean isSecurityEnabled = false; - for (Map.Entry<String, String> entry : properties.entrySet()) { - if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { - configuration.set(entry.getKey(), entry.getValue()); - } - if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) - && entry.getValue().equals(AuthType.KERBEROS.getDesc())) { - isSecurityEnabled = true; - } - } - if (onS3) { - setS3Configuration(configuration, properties); - } - String location = table.getSd().getLocation(); - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); - try { - if (isSecurityEnabled) { - UserGroupInformation.setConfiguration(configuration); - // login user from keytab - UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), - properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); - } - FileSystem fileSystem = path.getFileSystem(configuration); - iterators.add(fileSystem.listLocatedStatus(path)); - } catch (IOException e) { - LOG.warn("Get HDFS file remote iterator failed. {}" + e.getMessage()); - throw new DdlException("Get HDFS file remote iterator failed."); - } - return iterators; - } - public static List<String> getPartitionNames(HiveTable hiveTable) throws DdlException { HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); List<String> partitionNames = new ArrayList<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org