This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit abeb262194690647d78472678beafd7b9647611f Author: zhangdong <[email protected]> AuthorDate: Sun Oct 8 10:36:18 2023 +0800 [Enhance](catalog)add table cache in paimon jni (#25014) - fix get old schema after refresh paimon table - add table cache in paimon jni --- be/src/vec/exec/format/table/paimon_reader.cpp | 5 + .../org/apache/doris/paimon/PaimonJniScanner.java | 40 ++--- .../org/apache/doris/paimon/PaimonTableCache.java | 182 +++++++++++++++++++++ .../catalog/external/PaimonExternalTable.java | 8 +- .../planner/external/paimon/PaimonScanNode.java | 4 + gensrc/thrift/PlanNodes.thrift | 4 + 6 files changed, 221 insertions(+), 22 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 220cfc6100d..f1ebb96fa10 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -52,6 +52,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d params["paimon_split"] = range.table_format_params.paimon_params.paimon_split; params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names; params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate; + params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id); + params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id); + params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id); + params["last_update_time"] = + std::to_string(range.table_format_params.paimon_params.last_update_time); // Used to create paimon option for (auto& kv : range.table_format_params.paimon_params.paimon_options) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 3fe4efac9eb..49675627891 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -21,13 +21,10 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; +import org.apache.doris.paimon.PaimonTableCache.TableExt; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; @@ -57,12 +54,21 @@ public class PaimonJniScanner extends JniScanner { private final PaimonColumnValue columnValue = new PaimonColumnValue(); private List<String> paimonAllFieldNames; + private long ctlId; + private long dbId; + private long tblId; + private long lastUpdateTime; + public PaimonJniScanner(int batchSize, Map<String, String> params) { LOG.debug("params:{}", params); paimonSplit = params.get("paimon_split"); paimonPredicate = params.get("paimon_predicate"); dbName = params.get("db_name"); tblName = params.get("table_name"); + ctlId = Long.parseLong(params.get("ctl_id")); + dbId = Long.parseLong(params.get("db_id")); + tblId = Long.parseLong(params.get("tbl_id")); + lastUpdateTime = Long.parseLong(params.get("last_update_time")); super.batchSize = batchSize; super.fields = params.get("paimon_column_names").split(","); super.predicates = new ScanPredicate[0]; @@ -153,21 +159,17 @@ public class PaimonJniScanner extends JniScanner { } private void initTable() { - try { - Catalog catalog = createCatalog(); - table = catalog.getTable(Identifier.create(dbName, tblName)); - paimonAllFieldNames = PaimonScannerUtils.fieldNames(table.rowType()); - LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames); - } catch (Catalog.TableNotExistException e) { - LOG.warn("failed to create paimon external catalog ", e); - throw new RuntimeException(e); + PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, dbName, tblName); + TableExt tableExt = PaimonTableCache.getTable(key); + if (tableExt.getCreateTime() < lastUpdateTime) { + LOG.warn("invalidate cacha table:{}, localTime:{}, remoteTime:{}", key, tableExt.getCreateTime(), + lastUpdateTime); + PaimonTableCache.invalidateTableCache(key); + tableExt = PaimonTableCache.getTable(key); } + this.table = tableExt.getTable(); + paimonAllFieldNames = PaimonScannerUtils.fieldNames(this.table.rowType()); + LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames); } - private Catalog createCatalog() { - Options options = new Options(); - paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options); - return CatalogFactory.createCatalog(context); - } } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java new file mode 100644 index 00000000000..caf1f156de5 --- /dev/null +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.paimon; + +import com.google.common.base.Objects; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + + +public class PaimonTableCache { + private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class); + // Max cache num of paimon table + public static final long max_external_schema_cache_num = 50; + // The expiration time of a cache object after last access of it. + public static final long external_cache_expire_time_minutes_after_access = 100; + + private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache = CacheBuilder.newBuilder() + .maximumSize(max_external_schema_cache_num) + .expireAfterAccess(external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(new CacheLoader<PaimonTableCacheKey, TableExt>() { + @Override + public TableExt load(PaimonTableCacheKey key) { + return loadTable(key); + } + }); + + private static TableExt loadTable(PaimonTableCacheKey key) { + try { + LOG.warn("load table:{}", key); + Catalog catalog = createCatalog(key.getPaimonOptionParams()); + Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); + return new TableExt(table, System.currentTimeMillis()); + } catch (Catalog.TableNotExistException e) { + LOG.warn("failed to create paimon table ", e); + throw new RuntimeException(e); + } + } + + private static Catalog createCatalog(Map<String, String> paimonOptionParams) { + Options options = new Options(); + paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); + CatalogContext context = CatalogContext.create(options); + return CatalogFactory.createCatalog(context); + } + + public static void invalidateTableCache(PaimonTableCacheKey key) { + tableCache.invalidate(key); + } + + public static TableExt getTable(PaimonTableCacheKey key) { + try { + return tableCache.get(key); + } catch (ExecutionException e) { + throw new RuntimeException("failed to get table for:" + key); + } + } + + + public static class TableExt { + private Table table; + private long createTime; + + public TableExt(Table table, long createTime) { + this.table = table; + this.createTime = createTime; + } + + public Table getTable() { + return table; + } + + public long getCreateTime() { + return createTime; + } + } + + public static class PaimonTableCacheKey { + // in key + private long ctlId; + private long dbId; + private long tblId; + + // not in key + private Map<String, String> paimonOptionParams; + private String dbName; + private String tblName; + + public PaimonTableCacheKey(long ctlId, long dbId, long tblId, + Map<String, String> paimonOptionParams, String dbName, String tblName) { + this.ctlId = ctlId; + this.dbId = dbId; + this.tblId = tblId; + this.paimonOptionParams = paimonOptionParams; + this.dbName = dbName; + this.tblName = tblName; + } + + public long getCtlId() { + return ctlId; + } + + public long getDbId() { + return dbId; + } + + public long getTblId() { + return tblId; + } + + public Map<String, String> getPaimonOptionParams() { + return paimonOptionParams; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonTableCacheKey that = (PaimonTableCacheKey) o; + return ctlId == that.ctlId + && dbId == that.dbId + && tblId == that.tblId; + } + + @Override + public int hashCode() { + return Objects.hashCode(ctlId, dbId, tblId); + } + + @Override + public String toString() { + return "PaimonTableCacheKey{" + + "ctlId=" + ctlId + + ", dbId=" + dbId + + ", tblId=" + tblId + + ", paimonOptionParams=" + paimonOptionParams + + ", dbName='" + dbName + '\'' + + ", tblName='" + tblName + '\'' + + '}'; + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java index 6d7c124a0d0..16cfa76e632 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -55,19 +55,21 @@ public class PaimonExternalTable extends ExternalTable { protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { + originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); + lastUpdateTime = System.currentTimeMillis(); objectCreated = true; } } public Table getOriginTable() { - if (originTable == null) { - originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - } + makeSureInitialized(); return originTable; } @Override public List<Column> initSchema() { + //init schema need update lastUpdateTime and get latest schema + objectCreated = false; Table table = getOriginTable(); TableSchema schema = ((AbstractFileStoreTable) table).schema(); List<DataField> columns = schema.fields(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java index aedc62869c8..f8306391814 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -102,6 +102,10 @@ public class PaimonScanNode extends FileQueryScanNode { fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName()); fileDesc.setPaimonOptions(((PaimonExternalCatalog) source.getCatalog()).getPaimonOptionsMap()); fileDesc.setTableName(source.getTargetTable().getName()); + fileDesc.setCtlId(source.getCatalog().getId()); + fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); + fileDesc.setTblId(source.getTargetTable().getId()); + fileDesc.setLastUpdateTime(source.getTargetTable().getLastUpdateTime()); tableFormatFileDesc.setPaimonParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index cf599b5d206..9d75c7fc168 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -304,6 +304,10 @@ struct TPaimonFileDesc { 4: optional string table_name 5: optional string paimon_predicate 6: optional map<string, string> paimon_options + 7: optional i64 ctl_id + 8: optional i64 db_id + 9: optional i64 tbl_id + 10: optional i64 last_update_time } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
