This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 14a2a661061 [fix](paimon) fix not able to read paimon data from hdfs with HA (#39806) (#39876) 14a2a661061 is described below commit 14a2a661061a471972375e4d1256bd8bfdaa48c6 Author: Mingyu Chen <morning...@163.com> AuthorDate: Sat Aug 24 17:51:15 2024 +0800 [fix](paimon) fix not able to read paimon data from hdfs with HA (#39806) (#39876) bp #39806 --- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 8 +++++++- be/src/vec/exec/format/table/paimon_jni_reader.h | 1 + .../org/apache/doris/paimon/PaimonJniScanner.java | 13 +++++++++++-- .../org/apache/doris/paimon/PaimonTableCache.java | 22 ++++++++++++++++++---- .../datasource/paimon/source/PaimonScanNode.java | 2 ++ gensrc/thrift/PlanNodes.thrift | 1 + 6 files changed, 40 insertions(+), 7 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index ef690c15b68..fa73454f4b4 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -35,7 +35,8 @@ class Block; namespace doris::vectorized { -const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix."; +const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon."; +const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop."; PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, @@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d for (auto& kv : range.table_format_params.paimon_params.paimon_options) { params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; } + if (range.table_format_params.paimon_params.__isset.hadoop_conf) { + for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) { + params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; + } + } _jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner", params, column_names); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 6b6a6907270..6ecf6cd1f15 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader { public: static const std::string PAIMON_OPTION_PREFIX; + static const std::string HADOOP_OPTION_PREFIX; PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 719a7ea0b9d..4ef40d9fa1a 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -42,9 +42,12 @@ import java.util.stream.Collectors; public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); - private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix."; + private static final String PAIMON_OPTION_PREFIX = "paimon."; + private static final String HADOOP_OPTION_PREFIX = "hadoop."; + private final Map<String, String> params; private final Map<String, String> paimonOptionParams; + private final Map<String, String> hadoopOptionParams; private final String dbName; private final String tblName; private final String paimonSplit; @@ -87,6 +90,10 @@ public class PaimonJniScanner extends JniScanner { .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + hadoopOptionParams = params.entrySet().stream() + .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) + .collect(Collectors + .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); } @Override @@ -207,7 +214,8 @@ public class PaimonJniScanner extends JniScanner { } private void initTable() { - PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, dbName, tblName); + PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, + paimonOptionParams, hadoopOptionParams, dbName, tblName); TableExt tableExt = PaimonTableCache.getTable(key); if (tableExt.getCreateTime() < lastUpdateTime) { LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}", key, tableExt.getCreateTime(), @@ -223,3 +231,4 @@ public class PaimonJniScanner extends JniScanner { } } + diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java index f57ffeb5592..12aac153392 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -21,6 +21,7 @@ import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.hadoop.conf.Configuration; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -54,7 +55,7 @@ public class PaimonTableCache { private static TableExt loadTable(PaimonTableCacheKey key) { try { LOG.warn("load table:{}", key); - Catalog catalog = createCatalog(key.getPaimonOptionParams()); + Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); return new TableExt(table, System.currentTimeMillis()); } catch (Catalog.TableNotExistException e) { @@ -63,10 +64,14 @@ public class PaimonTableCache { } } - private static Catalog createCatalog(Map<String, String> paimonOptionParams) { + private static Catalog createCatalog( + Map<String, String> paimonOptionParams, + Map<String, String> hadoopOptionParams) { Options options = new Options(); paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options); + Configuration hadoopConf = new Configuration(); + hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue())); + CatalogContext context = CatalogContext.create(options, hadoopConf); return CatalogFactory.createCatalog(context); } @@ -108,15 +113,19 @@ public class PaimonTableCache { // not in key private Map<String, String> paimonOptionParams; + private Map<String, String> hadoopOptionParams; private String dbName; private String tblName; public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map<String, String> paimonOptionParams, String dbName, String tblName) { + Map<String, String> paimonOptionParams, + Map<String, String> hadoopOptionParams, + String dbName, String tblName) { this.ctlId = ctlId; this.dbId = dbId; this.tblId = tblId; this.paimonOptionParams = paimonOptionParams; + this.hadoopOptionParams = hadoopOptionParams; this.dbName = dbName; this.tblName = tblName; } @@ -137,6 +146,10 @@ public class PaimonTableCache { return paimonOptionParams; } + public Map<String, String> getHadoopOptionParams() { + return hadoopOptionParams; + } + public String getDbName() { return dbName; } @@ -171,6 +184,7 @@ public class PaimonTableCache { + ", dbId=" + dbId + ", tblId=" + tblId + ", paimonOptionParams=" + paimonOptionParams + + ", hadoopOptionParams=" + hadoopOptionParams + ", dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 27b40b5bcc9..ad8017a4f4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -164,6 +164,8 @@ public class PaimonScanNode extends FileQueryScanNode { fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); + // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { DeletionFile deletionFile = optDeletionFile.get(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index fc1a6e6baf5..a060f5efab4 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -327,6 +327,7 @@ struct TPaimonFileDesc { 10: optional i64 last_update_time 11: optional string file_format 12: optional TPaimonDeletionFileDesc deletion_file; + 13: optional map<string, string> hadoop_conf } struct TMaxComputeFileDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org