This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b4af6713478 branch-2.1: [Fix](catalog)Fixes query failures for Paimon 
tables stored in Kerberized HDFS #47192 (#47694)
b4af6713478 is described below

commit b4af671347847f0fce09a2715967320ccf0301ee
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 12 09:16:30 2025 +0800

    branch-2.1: [Fix](catalog)Fixes query failures for Paimon tables stored in 
Kerberized HDFS #47192 (#47694)
    
    Cherry-picked from #47192
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../apache/doris/hudi/HadoopHudiJniScanner.java    | 55 ++++++++------
 fe/be-java-extensions/paimon-scanner/pom.xml       |  6 --
 .../org/apache/doris/paimon/PaimonJniScanner.java  | 30 ++++++--
 .../authentication/AuthenticationConfig.java       | 17 ++++-
 .../PreExecutionAuthenticatorCache.java            | 87 ++++++++++++++++++++++
 5 files changed, 159 insertions(+), 36 deletions(-)

diff --git 
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
index f2b38815a36..f163be11aa2 100644
--- 
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
+++ 
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
@@ -20,6 +20,8 @@ package org.apache.doris.hudi;
 import org.apache.doris.common.classloader.ThreadClassLoaderContext;
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner {
     private final int fetchSize;
     private final ClassLoader classLoader;
 
+    private final PreExecutionAuthenticator preExecutionAuthenticator;
+
     public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
         this.basePath = params.get("base_path");
         this.dataFilePath = params.get("data_file_path");
@@ -120,6 +124,7 @@ public class HadoopHudiJniScanner extends JniScanner {
                 LOG.debug("get hudi params {}: {}", entry.getKey(), 
entry.getValue());
             }
         }
+        this.preExecutionAuthenticator = 
PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps);
 
         ZoneId zoneId;
         if (Strings.isNullOrEmpty(params.get("time_zone"))) {
@@ -135,10 +140,14 @@ public class HadoopHudiJniScanner extends JniScanner {
     @Override
     public void open() throws IOException {
         try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
-            initRequiredColumnsAndTypes();
-            initTableInfo(requiredTypes, requiredFields, fetchSize);
-            Properties properties = getReaderProperties();
-            initReader(properties);
+            preExecutionAuthenticator.execute(() -> {
+                initRequiredColumnsAndTypes();
+                initTableInfo(requiredTypes, requiredFields, fetchSize);
+                Properties properties = getReaderProperties();
+                initReader(properties);
+                return null;
+            });
+
         } catch (Exception e) {
             close();
             LOG.warn("failed to open hadoop hudi jni scanner", e);
@@ -149,25 +158,27 @@ public class HadoopHudiJniScanner extends JniScanner {
     @Override
     public int getNext() throws IOException {
         try (ThreadClassLoaderContext ignored = new 
ThreadClassLoaderContext(classLoader)) {
-            NullWritable key = reader.createKey();
-            ArrayWritable value = reader.createValue();
-            int numRows = 0;
-            for (; numRows < fetchSize; numRows++) {
-                if (!reader.next(key, value)) {
-                    break;
+            return preExecutionAuthenticator.execute(() -> {
+                NullWritable key = reader.createKey();
+                ArrayWritable value = reader.createValue();
+                int numRows = 0;
+                for (; numRows < fetchSize; numRows++) {
+                    if (!reader.next(key, value)) {
+                        break;
+                    }
+                    Object rowData = deserializer.deserialize(value);
+                    for (int i = 0; i < fields.length; i++) {
+                        Object fieldData = 
rowInspector.getStructFieldData(rowData, structFields[i]);
+                        columnValue.setRow(fieldData);
+                        // LOG.info("rows: {}, column: {}, col name: {}, col 
type: {}, inspector: {}",
+                        //        numRows, i, types[i].getName(), 
types[i].getType().name(),
+                        //        fieldInspectors[i].getTypeName());
+                        columnValue.setField(types[i], fieldInspectors[i]);
+                        appendData(i, columnValue);
+                    }
                 }
-                Object rowData = deserializer.deserialize(value);
-                for (int i = 0; i < fields.length; i++) {
-                    Object fieldData = 
rowInspector.getStructFieldData(rowData, structFields[i]);
-                    columnValue.setRow(fieldData);
-                    // LOG.info("rows: {}, column: {}, col name: {}, col type: 
{}, inspector: {}",
-                    //        numRows, i, types[i].getName(), 
types[i].getType().name(),
-                    //        fieldInspectors[i].getTypeName());
-                    columnValue.setField(types[i], fieldInspectors[i]);
-                    appendData(i, columnValue);
-                }
-            }
-            return numRows;
+                return numRows;
+            });
         } catch (Exception e) {
             close();
             LOG.warn("failed to get next in hadoop hudi jni scanner", e);
diff --git a/fe/be-java-extensions/paimon-scanner/pom.xml 
b/fe/be-java-extensions/paimon-scanner/pom.xml
index 0b513691303..3b4eb7acfe3 100644
--- a/fe/be-java-extensions/paimon-scanner/pom.xml
+++ b/fe/be-java-extensions/paimon-scanner/pom.xml
@@ -39,12 +39,6 @@ under the License.
             <groupId>org.apache.doris</groupId>
             <artifactId>java-common</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>fe-common</artifactId>
-                    <groupId>org.apache.doris</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 6ffd5f1ad90..e6c04a0a2f7 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -20,6 +20,8 @@ package org.apache.doris.paimon;
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.TableSchema;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
 import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
 import org.apache.doris.paimon.PaimonTableCache.TableExt;
 
@@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner {
     private long lastUpdateTime;
     private RecordReader.RecordIterator<InternalRow> recordIterator = null;
     private final ClassLoader classLoader;
+    private PreExecutionAuthenticator preExecutionAuthenticator;
 
     public PaimonJniScanner(int batchSize, Map<String, String> params) {
         this.classLoader = this.getClass().getClassLoader();
@@ -104,6 +107,7 @@ public class PaimonJniScanner extends JniScanner {
                 .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
                 .collect(Collectors
                         .toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+        this.preExecutionAuthenticator = 
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
     }
 
     @Override
@@ -114,12 +118,16 @@ public class PaimonJniScanner extends JniScanner {
             //        
`Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
             // so we need to provide a classloader, otherwise it will cause 
NPE.
             Thread.currentThread().setContextClassLoader(classLoader);
-            initTable();
-            initReader();
+            preExecutionAuthenticator.execute(() -> {
+                initTable();
+                initReader();
+                return null;
+            });
             resetDatetimeV2Precision();
+
         } catch (Throwable e) {
             LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e);
-            throw e;
+            throw new RuntimeException(e);
         }
     }
 
@@ -137,7 +145,7 @@ public class PaimonJniScanner extends JniScanner {
         readBuilder.withFilter(getPredicates());
         reader = 
readBuilder.newRead().executeFilter().createReader(getSplit());
         paimonDataTypeList =
-            Arrays.stream(projected).mapToObj(i -> 
table.rowType().getTypeAt(i)).collect(Collectors.toList());
+                Arrays.stream(projected).mapToObj(i -> 
table.rowType().getTypeAt(i)).collect(Collectors.toList());
     }
 
     private int[] getProjected() {
@@ -183,8 +191,7 @@ public class PaimonJniScanner extends JniScanner {
         }
     }
 
-    @Override
-    protected int getNext() throws IOException {
+    private int readAndProcessNextBatch() throws IOException {
         int rows = 0;
         try {
             if (recordIterator == null) {
@@ -210,13 +217,22 @@ public class PaimonJniScanner extends JniScanner {
         } catch (Exception e) {
             close();
             LOG.warn("Failed to get the next batch of paimon. "
-                    + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: 
{}, dataType: {}",
+                            + "split: {}, requiredFieldNames: {}, 
paimonAllFieldNames: {}, dataType: {}",
                     getSplit(), params.get("required_fields"), 
paimonAllFieldNames, paimonDataTypeList, e);
             throw new IOException(e);
         }
         return rows;
     }
 
+    @Override
+    protected int getNext() {
+        try {
+            return 
preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
         // do nothing
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
index b580f9ecbe0..2fa8d09b0d7 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Map;
+
 public abstract class AuthenticationConfig {
     private static final Logger LOG = 
LogManager.getLogger(AuthenticationConfig.class);
     public static String HADOOP_USER_NAME = "hadoop.username";
@@ -31,12 +33,24 @@ public abstract class AuthenticationConfig {
     public static String HIVE_KERBEROS_PRINCIPAL = 
"hive.metastore.kerberos.principal";
     public static String HIVE_KERBEROS_KEYTAB = 
"hive.metastore.kerberos.keytab.file";
     public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
+    private static final String DEFAULT_HADOOP_USERNAME = "hadoop";
 
     /**
      * @return true if the config is valid, otherwise false.
      */
     public abstract boolean isValid();
 
+    protected static String generalAuthenticationConfigKey(Map<String, String> 
conf) {
+        String authentication = 
conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                null);
+        if (AuthType.KERBEROS.getDesc().equals(authentication)) {
+            return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + 
conf.get(HADOOP_KERBEROS_KEYTAB) + "-"
+                    + conf.getOrDefault(DORIS_KRB5_DEBUG, "false");
+        } else {
+            return conf.getOrDefault(HADOOP_USER_NAME, 
DEFAULT_HADOOP_USERNAME);
+        }
+    }
+
     /**
      * get kerberos config from hadoop conf
      * @param conf config
@@ -90,7 +104,8 @@ public abstract class AuthenticationConfig {
     private static AuthenticationConfig 
createSimpleAuthenticationConfig(Configuration conf) {
         // AuthType.SIMPLE
         SimpleAuthenticationConfig simpleAuthenticationConfig = new 
SimpleAuthenticationConfig();
-        simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
+        String hadoopUserName = conf.get(HADOOP_USER_NAME, 
DEFAULT_HADOOP_USERNAME);
+        simpleAuthenticationConfig.setUsername(hadoopUserName);
         return simpleAuthenticationConfig;
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
new file mode 100644
index 00000000000..5b0d1cb70ff
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A cache class for storing and retrieving PreExecutionAuthenticator 
instances based on Hadoop configurations.
+ * This class caches PreExecutionAuthenticator objects to avoid recreating 
them for the same Hadoop configuration.
+ * It uses a Least Recently Used (LRU) cache, where the least recently used 
entries are removed when the cache exceeds
+ * the maximum size (MAX_CACHE_SIZE).
+ * <p>
+ * The purpose of this class is to ensure that for identical Hadoop 
configurations (key-value pairs),
+ * only one PreExecutionAuthenticator instance is created and reused, 
optimizing performance by reducing
+ * redundant instantiations.
+ */
+public class PreExecutionAuthenticatorCache {
+    private static final Logger LOG = 
LogManager.getLogger(PreExecutionAuthenticatorCache.class);
+    private static final int MAX_CACHE_SIZE = 100;
+
+    private static final Cache<String, PreExecutionAuthenticator> 
preExecutionAuthenticatorCache =
+            CacheBuilder.newBuilder()
+                    .maximumSize(MAX_CACHE_SIZE)
+                    .expireAfterAccess(60 * 24, TimeUnit.MINUTES)
+                    .build();
+
+    /**
+     * Retrieves a PreExecutionAuthenticator instance from the cache or 
creates a new one if it doesn't exist.
+     * This method first checks if the configuration is already cached. If 
not, it computes a new instance and
+     * caches it for future use.
+     *
+     * @param hadoopConfig The Hadoop configuration (key-value pairs)
+     * @return A PreExecutionAuthenticator instance for the given configuration
+     */
+    public static PreExecutionAuthenticator getAuthenticator(Map<String, 
String> hadoopConfig) {
+        String authenticatorCacheKey = 
AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig);
+        PreExecutionAuthenticator authenticator;
+        try {
+            authenticator = 
preExecutionAuthenticatorCache.get(authenticatorCacheKey,
+                    () -> createAuthenticator(hadoopConfig, 
authenticatorCacheKey));
+        } catch (ExecutionException exception) {
+            throw new RuntimeException("Failed to create 
PreExecutionAuthenticator for key: " + authenticatorCacheKey,
+                    exception);
+        }
+        return authenticator;
+    }
+
+    private static PreExecutionAuthenticator createAuthenticator(Map<String, 
String> hadoopConfig,
+                                                                 String 
authenticatorCacheKey) {
+        Configuration conf = new Configuration();
+        hadoopConfig.forEach(conf::set);
+        PreExecutionAuthenticator preExecutionAuthenticator = new 
PreExecutionAuthenticator();
+        AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(
+                conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
+                AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
+        HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
+                .getHadoopAuthenticator(authenticationConfig);
+        preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
+        LOG.info("Creating new PreExecutionAuthenticator for configuration, 
Cache key: {}",
+                authenticatorCacheKey);
+        return preExecutionAuthenticator;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to