This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit af6c4ace48518f148854a81b8ef9b42385ef0ffe
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sat Aug 24 14:20:34 2024 +0800

    [fix](paimon) fix not able to read paimon data from hdfs with HA (#39806)
    
    When reading Paimon data via JNI, the hadoop properties should be passed
    to JNI part.
    Otherwise, it may miss some properties and return error like:
    `CAUSED BY: IllegalArgumentException: java.net.UnknownHostException:
    hdfs-cluster`
---
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  8 +++++++-
 be/src/vec/exec/format/table/paimon_jni_reader.h   |  1 +
 .../org/apache/doris/paimon/PaimonJniScanner.java  | 13 +++++++++++--
 .../org/apache/doris/paimon/PaimonTableCache.java  | 22 ++++++++++++++++++----
 .../datasource/paimon/source/PaimonScanNode.java   |  2 ++
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 6 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index b53848f8a1a..a9ec243cf46 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -35,7 +35,8 @@ class Block;
 
 namespace doris::vectorized {
 
-const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = 
"paimon_option_prefix.";
+const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon.";
+const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
 
 PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                                  RuntimeState* state, RuntimeProfile* profile,
@@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
     for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
         params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
     }
+    if (range.table_format_params.paimon_params.__isset.hadoop_conf) {
+        for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) {
+            params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+        }
+    }
     _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
                                                     params, column_names);
 }
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 6b6a6907270..6ecf6cd1f15 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader {
 
 public:
     static const std::string PAIMON_OPTION_PREFIX;
+    static const std::string HADOOP_OPTION_PREFIX;
     PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
                     RuntimeProfile* profile, const TFileRangeDesc& range);
 
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 719a7ea0b9d..4ef40d9fa1a 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -42,9 +42,12 @@ import java.util.stream.Collectors;
 
 public class PaimonJniScanner extends JniScanner {
     private static final Logger LOG = 
LoggerFactory.getLogger(PaimonJniScanner.class);
-    private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
+    private static final String PAIMON_OPTION_PREFIX = "paimon.";
+    private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+
     private final Map<String, String> params;
     private final Map<String, String> paimonOptionParams;
+    private final Map<String, String> hadoopOptionParams;
     private final String dbName;
     private final String tblName;
     private final String paimonSplit;
@@ -87,6 +90,10 @@ public class PaimonJniScanner extends JniScanner {
                 .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
                 .collect(Collectors
                         .toMap(kv1 -> 
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+        hadoopOptionParams = params.entrySet().stream()
+                .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+                .collect(Collectors
+                        .toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
     }
 
     @Override
@@ -207,7 +214,8 @@ public class PaimonJniScanner extends JniScanner {
     }
 
     private void initTable() {
-        PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, 
paimonOptionParams, dbName, tblName);
+        PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
+                paimonOptionParams, hadoopOptionParams, dbName, tblName);
         TableExt tableExt = PaimonTableCache.getTable(key);
         if (tableExt.getCreateTime() < lastUpdateTime) {
             LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}", 
key, tableExt.getCreateTime(),
@@ -223,3 +231,4 @@ public class PaimonJniScanner extends JniScanner {
     }
 
 }
+
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
index f57ffeb5592..12aac153392 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
@@ -21,6 +21,7 @@ import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -54,7 +55,7 @@ public class PaimonTableCache {
     private static TableExt loadTable(PaimonTableCacheKey key) {
         try {
             LOG.warn("load table:{}", key);
-            Catalog catalog = createCatalog(key.getPaimonOptionParams());
+            Catalog catalog = createCatalog(key.getPaimonOptionParams(), 
key.getHadoopOptionParams());
             Table table = catalog.getTable(Identifier.create(key.getDbName(), 
key.getTblName()));
             return new TableExt(table, System.currentTimeMillis());
         } catch (Catalog.TableNotExistException e) {
@@ -63,10 +64,14 @@ public class PaimonTableCache {
         }
     }
 
-    private static Catalog createCatalog(Map<String, String> 
paimonOptionParams) {
+    private static Catalog createCatalog(
+            Map<String, String> paimonOptionParams,
+            Map<String, String> hadoopOptionParams) {
         Options options = new Options();
         paimonOptionParams.entrySet().stream().forEach(kv -> 
options.set(kv.getKey(), kv.getValue()));
-        CatalogContext context = CatalogContext.create(options);
+        Configuration hadoopConf = new Configuration();
+        hadoopOptionParams.entrySet().stream().forEach(kv -> 
hadoopConf.set(kv.getKey(), kv.getValue()));
+        CatalogContext context = CatalogContext.create(options, hadoopConf);
         return CatalogFactory.createCatalog(context);
     }
 
@@ -108,15 +113,19 @@ public class PaimonTableCache {
 
         // not in key
         private Map<String, String> paimonOptionParams;
+        private Map<String, String> hadoopOptionParams;
         private String dbName;
         private String tblName;
 
         public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
-                Map<String, String> paimonOptionParams, String dbName, String 
tblName) {
+                Map<String, String> paimonOptionParams,
+                Map<String, String> hadoopOptionParams,
+                String dbName, String tblName) {
             this.ctlId = ctlId;
             this.dbId = dbId;
             this.tblId = tblId;
             this.paimonOptionParams = paimonOptionParams;
+            this.hadoopOptionParams = hadoopOptionParams;
             this.dbName = dbName;
             this.tblName = tblName;
         }
@@ -137,6 +146,10 @@ public class PaimonTableCache {
             return paimonOptionParams;
         }
 
+        public Map<String, String> getHadoopOptionParams() {
+            return hadoopOptionParams;
+        }
+
         public String getDbName() {
             return dbName;
         }
@@ -171,6 +184,7 @@ public class PaimonTableCache {
                     + ", dbId=" + dbId
                     + ", tblId=" + tblId
                     + ", paimonOptionParams=" + paimonOptionParams
+                    + ", hadoopOptionParams=" + hadoopOptionParams
                     + ", dbName='" + dbName + '\''
                     + ", tblName='" + tblName + '\''
                     + '}';
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 45516fd2841..cf8825d2684 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -157,6 +157,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         fileDesc.setDbId(((PaimonExternalTable) 
source.getTargetTable()).getDbId());
         fileDesc.setTblId(source.getTargetTable().getId());
         fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
+        // The hadoop conf should be same with 
PaimonExternalCatalog.createCatalog()#getConfiguration()
+        
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
         Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
         if (optDeletionFile.isPresent()) {
             DeletionFile deletionFile = optDeletionFile.get();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 26d7983a468..758ead76532 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -329,6 +329,7 @@ struct TPaimonFileDesc {
     10: optional i64 last_update_time
     11: optional string file_format
     12: optional TPaimonDeletionFileDesc deletion_file;
+    13: optional map<string, string> hadoop_conf
 }
 
 struct TTrinoConnectorFileDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to