This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b4af6713478 branch-2.1: [Fix](catalog)Fixes query failures for Paimon tables stored in Kerberized HDFS #47192 (#47694) b4af6713478 is described below commit b4af671347847f0fce09a2715967320ccf0301ee Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Feb 12 09:16:30 2025 +0800 branch-2.1: [Fix](catalog)Fixes query failures for Paimon tables stored in Kerberized HDFS #47192 (#47694) Cherry-picked from #47192 Co-authored-by: Calvin Kirs <guoqi...@selectdb.com> --- .../apache/doris/hudi/HadoopHudiJniScanner.java | 55 ++++++++------ fe/be-java-extensions/paimon-scanner/pom.xml | 6 -- .../org/apache/doris/paimon/PaimonJniScanner.java | 30 ++++++-- .../authentication/AuthenticationConfig.java | 17 ++++- .../PreExecutionAuthenticatorCache.java | 87 ++++++++++++++++++++++ 5 files changed, 159 insertions(+), 36 deletions(-) diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java index f2b38815a36..f163be11aa2 100644 --- a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -20,6 +20,8 @@ package org.apache.doris.hudi; import org.apache.doris.common.classloader.ThreadClassLoaderContext; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner { private final int fetchSize; private final ClassLoader classLoader; + private final PreExecutionAuthenticator preExecutionAuthenticator; + public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) { this.basePath = params.get("base_path"); this.dataFilePath = params.get("data_file_path"); @@ -120,6 +124,7 @@ public class HadoopHudiJniScanner extends JniScanner { LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); } } + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps); ZoneId zoneId; if (Strings.isNullOrEmpty(params.get("time_zone"))) { @@ -135,10 +140,14 @@ public class HadoopHudiJniScanner extends JniScanner { @Override public void open() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - initRequiredColumnsAndTypes(); - initTableInfo(requiredTypes, requiredFields, fetchSize); - Properties properties = getReaderProperties(); - initReader(properties); + preExecutionAuthenticator.execute(() -> { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + return null; + }); + } catch (Exception e) { close(); LOG.warn("failed to open hadoop hudi jni scanner", e); @@ -149,25 +158,27 @@ public class HadoopHudiJniScanner extends JniScanner { @Override public int getNext() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - int numRows = 0; - for (; numRows < fetchSize; numRows++) { - if (!reader.next(key, value)) { - break; + return preExecutionAuthenticator.execute(() -> { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } } - Object rowData = deserializer.deserialize(value); - for (int i = 0; i < fields.length; i++) { - Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); - columnValue.setRow(fieldData); - // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", - // numRows, i, types[i].getName(), types[i].getType().name(), - // fieldInspectors[i].getTypeName()); - columnValue.setField(types[i], fieldInspectors[i]); - appendData(i, columnValue); - } - } - return numRows; + return numRows; + }); } catch (Exception e) { close(); LOG.warn("failed to get next in hadoop hudi jni scanner", e); diff --git a/fe/be-java-extensions/paimon-scanner/pom.xml b/fe/be-java-extensions/paimon-scanner/pom.xml index 0b513691303..3b4eb7acfe3 100644 --- a/fe/be-java-extensions/paimon-scanner/pom.xml +++ b/fe/be-java-extensions/paimon-scanner/pom.xml @@ -39,12 +39,6 @@ under the License. <groupId>org.apache.doris</groupId> <artifactId>java-common</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <artifactId>fe-common</artifactId> - <groupId>org.apache.doris</groupId> - </exclusion> - </exclusions> </dependency> <dependency> diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 6ffd5f1ad90..e6c04a0a2f7 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -20,6 +20,8 @@ package org.apache.doris.paimon; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; @@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner { private long lastUpdateTime; private RecordReader.RecordIterator<InternalRow> recordIterator = null; private final ClassLoader classLoader; + private PreExecutionAuthenticator preExecutionAuthenticator; public PaimonJniScanner(int batchSize, Map<String, String> params) { this.classLoader = this.getClass().getClassLoader(); @@ -104,6 +107,7 @@ public class PaimonJniScanner extends JniScanner { .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override @@ -114,12 +118,16 @@ public class PaimonJniScanner extends JniScanner { // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); - initTable(); - initReader(); + preExecutionAuthenticator.execute(() -> { + initTable(); + initReader(); + return null; + }); resetDatetimeV2Precision(); + } catch (Throwable e) { LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e); - throw e; + throw new RuntimeException(e); } } @@ -137,7 +145,7 @@ public class PaimonJniScanner extends JniScanner { readBuilder.withFilter(getPredicates()); reader = readBuilder.newRead().executeFilter().createReader(getSplit()); paimonDataTypeList = - Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); + Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); } private int[] getProjected() { @@ -183,8 +191,7 @@ public class PaimonJniScanner extends JniScanner { } } - @Override - protected int getNext() throws IOException { + private int readAndProcessNextBatch() throws IOException { int rows = 0; try { if (recordIterator == null) { @@ -210,13 +217,22 @@ public class PaimonJniScanner extends JniScanner { } catch (Exception e) { close(); LOG.warn("Failed to get the next batch of paimon. " - + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", + + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); throw new IOException(e); } return rows; } + @Override + protected int getNext() { + try { + return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { // do nothing diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index b580f9ecbe0..2fa8d09b0d7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + public abstract class AuthenticationConfig { private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class); public static String HADOOP_USER_NAME = "hadoop.username"; @@ -31,12 +33,24 @@ public abstract class AuthenticationConfig { public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; + private static final String DEFAULT_HADOOP_USERNAME = "hadoop"; /** * @return true if the config is valid, otherwise false. */ public abstract boolean isValid(); + protected static String generalAuthenticationConfigKey(Map<String, String> conf) { + String authentication = conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + null); + if (AuthType.KERBEROS.getDesc().equals(authentication)) { + return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-" + + conf.getOrDefault(DORIS_KRB5_DEBUG, "false"); + } else { + return conf.getOrDefault(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); + } + } + /** * get kerberos config from hadoop conf * @param conf config @@ -90,7 +104,8 @@ public abstract class AuthenticationConfig { private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) { // AuthType.SIMPLE SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig(); - simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME)); + String hadoopUserName = conf.get(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); + simpleAuthenticationConfig.setUsername(hadoopUserName); return simpleAuthenticationConfig; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java new file mode 100644 index 00000000000..5b0d1cb70ff --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations. + * This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration. + * It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds + * the maximum size (MAX_CACHE_SIZE). + * <p> + * The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs), + * only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing + * redundant instantiations. + */ +public class PreExecutionAuthenticatorCache { + private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); + private static final int MAX_CACHE_SIZE = 100; + + private static final Cache<String, PreExecutionAuthenticator> preExecutionAuthenticatorCache = + CacheBuilder.newBuilder() + .maximumSize(MAX_CACHE_SIZE) + .expireAfterAccess(60 * 24, TimeUnit.MINUTES) + .build(); + + /** + * Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist. + * This method first checks if the configuration is already cached. If not, it computes a new instance and + * caches it for future use. + * + * @param hadoopConfig The Hadoop configuration (key-value pairs) + * @return A PreExecutionAuthenticator instance for the given configuration + */ + public static PreExecutionAuthenticator getAuthenticator(Map<String, String> hadoopConfig) { + String authenticatorCacheKey = AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig); + PreExecutionAuthenticator authenticator; + try { + authenticator = preExecutionAuthenticatorCache.get(authenticatorCacheKey, + () -> createAuthenticator(hadoopConfig, authenticatorCacheKey)); + } catch (ExecutionException exception) { + throw new RuntimeException("Failed to create PreExecutionAuthenticator for key: " + authenticatorCacheKey, + exception); + } + return authenticator; + } + + private static PreExecutionAuthenticator createAuthenticator(Map<String, String> hadoopConfig, + String authenticatorCacheKey) { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig( + conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator + .getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + LOG.info("Creating new PreExecutionAuthenticator for configuration, Cache key: {}", + authenticatorCacheKey); + return preExecutionAuthenticator; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org