This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit af6c4ace48518f148854a81b8ef9b42385ef0ffe Author: Mingyu Chen <morning...@163.com> AuthorDate: Sat Aug 24 14:20:34 2024 +0800 [fix](paimon) fix not able to read paimon data from hdfs with HA (#39806) When reading Paimon data via JNI, the hadoop properties should be passed to JNI part. Otherwise, it may miss some properties and return error like: `CAUSED BY: IllegalArgumentException: java.net.UnknownHostException: hdfs-cluster` --- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 8 +++++++- be/src/vec/exec/format/table/paimon_jni_reader.h | 1 + .../org/apache/doris/paimon/PaimonJniScanner.java | 13 +++++++++++-- .../org/apache/doris/paimon/PaimonTableCache.java | 22 ++++++++++++++++++---- .../datasource/paimon/source/PaimonScanNode.java | 2 ++ gensrc/thrift/PlanNodes.thrift | 1 + 6 files changed, 40 insertions(+), 7 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index b53848f8a1a..a9ec243cf46 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -35,7 +35,8 @@ class Block; namespace doris::vectorized { -const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix."; +const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon."; +const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop."; PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, @@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d for (auto& kv : range.table_format_params.paimon_params.paimon_options) { params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; } + if (range.table_format_params.paimon_params.__isset.hadoop_conf) { + for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) { + params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; + } + } _jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner", params, column_names); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 6b6a6907270..6ecf6cd1f15 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader { public: static const std::string PAIMON_OPTION_PREFIX; + static const std::string HADOOP_OPTION_PREFIX; PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 719a7ea0b9d..4ef40d9fa1a 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -42,9 +42,12 @@ import java.util.stream.Collectors; public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); - private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix."; + private static final String PAIMON_OPTION_PREFIX = "paimon."; + private static final String HADOOP_OPTION_PREFIX = "hadoop."; + private final Map<String, String> params; private final Map<String, String> paimonOptionParams; + private final Map<String, String> hadoopOptionParams; private final String dbName; private final String tblName; private final String paimonSplit; @@ -87,6 +90,10 @@ public class PaimonJniScanner extends JniScanner { .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + hadoopOptionParams = params.entrySet().stream() + .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) + .collect(Collectors + .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); } @Override @@ -207,7 +214,8 @@ public class PaimonJniScanner extends JniScanner { } private void initTable() { - PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, dbName, tblName); + PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, + paimonOptionParams, hadoopOptionParams, dbName, tblName); TableExt tableExt = PaimonTableCache.getTable(key); if (tableExt.getCreateTime() < lastUpdateTime) { LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}", key, tableExt.getCreateTime(), @@ -223,3 +231,4 @@ public class PaimonJniScanner extends JniScanner { } } + diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java index f57ffeb5592..12aac153392 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -21,6 +21,7 @@ import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.hadoop.conf.Configuration; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -54,7 +55,7 @@ public class PaimonTableCache { private static TableExt loadTable(PaimonTableCacheKey key) { try { LOG.warn("load table:{}", key); - Catalog catalog = createCatalog(key.getPaimonOptionParams()); + Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); return new TableExt(table, System.currentTimeMillis()); } catch (Catalog.TableNotExistException e) { @@ -63,10 +64,14 @@ public class PaimonTableCache { } } - private static Catalog createCatalog(Map<String, String> paimonOptionParams) { + private static Catalog createCatalog( + Map<String, String> paimonOptionParams, + Map<String, String> hadoopOptionParams) { Options options = new Options(); paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options); + Configuration hadoopConf = new Configuration(); + hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue())); + CatalogContext context = CatalogContext.create(options, hadoopConf); return CatalogFactory.createCatalog(context); } @@ -108,15 +113,19 @@ public class PaimonTableCache { // not in key private Map<String, String> paimonOptionParams; + private Map<String, String> hadoopOptionParams; private String dbName; private String tblName; public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map<String, String> paimonOptionParams, String dbName, String tblName) { + Map<String, String> paimonOptionParams, + Map<String, String> hadoopOptionParams, + String dbName, String tblName) { this.ctlId = ctlId; this.dbId = dbId; this.tblId = tblId; this.paimonOptionParams = paimonOptionParams; + this.hadoopOptionParams = hadoopOptionParams; this.dbName = dbName; this.tblName = tblName; } @@ -137,6 +146,10 @@ public class PaimonTableCache { return paimonOptionParams; } + public Map<String, String> getHadoopOptionParams() { + return hadoopOptionParams; + } + public String getDbName() { return dbName; } @@ -171,6 +184,7 @@ public class PaimonTableCache { + ", dbId=" + dbId + ", tblId=" + tblId + ", paimonOptionParams=" + paimonOptionParams + + ", hadoopOptionParams=" + hadoopOptionParams + ", dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 45516fd2841..cf8825d2684 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -157,6 +157,8 @@ public class PaimonScanNode extends FileQueryScanNode { fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); + // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { DeletionFile deletionFile = optDeletionFile.get(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 26d7983a468..758ead76532 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -329,6 +329,7 @@ struct TPaimonFileDesc { 10: optional i64 last_update_time 11: optional string file_format 12: optional TPaimonDeletionFileDesc deletion_file; + 13: optional map<string, string> hadoop_conf } struct TTrinoConnectorFileDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org