This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit abeb262194690647d78472678beafd7b9647611f
Author: zhangdong <[email protected]>
AuthorDate: Sun Oct 8 10:36:18 2023 +0800

    [Enhance](catalog)add table cache in paimon jni (#25014)
    
    - fix get old schema after refresh paimon table
    - add table cache in paimon jni
---
 be/src/vec/exec/format/table/paimon_reader.cpp     |   5 +
 .../org/apache/doris/paimon/PaimonJniScanner.java  |  40 ++---
 .../org/apache/doris/paimon/PaimonTableCache.java  | 182 +++++++++++++++++++++
 .../catalog/external/PaimonExternalTable.java      |   8 +-
 .../planner/external/paimon/PaimonScanNode.java    |   4 +
 gensrc/thrift/PlanNodes.thrift                     |   4 +
 6 files changed, 221 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 220cfc6100d..f1ebb96fa10 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -52,6 +52,11 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
     params["paimon_split"] = 
range.table_format_params.paimon_params.paimon_split;
     params["paimon_column_names"] = 
range.table_format_params.paimon_params.paimon_column_names;
     params["paimon_predicate"] = 
range.table_format_params.paimon_params.paimon_predicate;
+    params["ctl_id"] = 
std::to_string(range.table_format_params.paimon_params.ctl_id);
+    params["db_id"] = 
std::to_string(range.table_format_params.paimon_params.db_id);
+    params["tbl_id"] = 
std::to_string(range.table_format_params.paimon_params.tbl_id);
+    params["last_update_time"] =
+            
std::to_string(range.table_format_params.paimon_params.last_update_time);
 
     // Used to create paimon option
     for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 3fe4efac9eb..49675627891 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -21,13 +21,10 @@ import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.TableSchema;
+import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
+import org.apache.doris.paimon.PaimonTableCache.TableExt;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
@@ -57,12 +54,21 @@ public class PaimonJniScanner extends JniScanner {
     private final PaimonColumnValue columnValue = new PaimonColumnValue();
     private List<String> paimonAllFieldNames;
 
+    private long ctlId;
+    private long dbId;
+    private long tblId;
+    private long lastUpdateTime;
+
     public PaimonJniScanner(int batchSize, Map<String, String> params) {
         LOG.debug("params:{}", params);
         paimonSplit = params.get("paimon_split");
         paimonPredicate = params.get("paimon_predicate");
         dbName = params.get("db_name");
         tblName = params.get("table_name");
+        ctlId = Long.parseLong(params.get("ctl_id"));
+        dbId = Long.parseLong(params.get("db_id"));
+        tblId = Long.parseLong(params.get("tbl_id"));
+        lastUpdateTime = Long.parseLong(params.get("last_update_time"));
         super.batchSize = batchSize;
         super.fields = params.get("paimon_column_names").split(",");
         super.predicates = new ScanPredicate[0];
@@ -153,21 +159,17 @@ public class PaimonJniScanner extends JniScanner {
     }
 
     private void initTable() {
-        try {
-            Catalog catalog = createCatalog();
-            table = catalog.getTable(Identifier.create(dbName, tblName));
-            paimonAllFieldNames = 
PaimonScannerUtils.fieldNames(table.rowType());
-            LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames);
-        } catch (Catalog.TableNotExistException e) {
-            LOG.warn("failed to create paimon external catalog ", e);
-            throw new RuntimeException(e);
+        PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, 
paimonOptionParams, dbName, tblName);
+        TableExt tableExt = PaimonTableCache.getTable(key);
+        if (tableExt.getCreateTime() < lastUpdateTime) {
+            LOG.warn("invalidate cacha table:{}, localTime:{}, remoteTime:{}", 
key, tableExt.getCreateTime(),
+                    lastUpdateTime);
+            PaimonTableCache.invalidateTableCache(key);
+            tableExt = PaimonTableCache.getTable(key);
         }
+        this.table = tableExt.getTable();
+        paimonAllFieldNames = 
PaimonScannerUtils.fieldNames(this.table.rowType());
+        LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames);
     }
 
-    private Catalog createCatalog() {
-        Options options = new Options();
-        paimonOptionParams.entrySet().stream().forEach(kv -> 
options.set(kv.getKey(), kv.getValue()));
-        CatalogContext context = CatalogContext.create(options);
-        return CatalogFactory.createCatalog(context);
-    }
 }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
new file mode 100644
index 00000000000..caf1f156de5
--- /dev/null
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+
+public class PaimonTableCache {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonTableCache.class);
+    // Max cache num of paimon table
+    public static final long max_external_schema_cache_num = 50;
+    // The expiration time of a cache object after last access of it.
+    public static final long external_cache_expire_time_minutes_after_access = 
100;
+
+    private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache = 
CacheBuilder.newBuilder()
+            .maximumSize(max_external_schema_cache_num)
+            
.expireAfterAccess(external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+            .build(new CacheLoader<PaimonTableCacheKey, TableExt>() {
+                @Override
+                public TableExt load(PaimonTableCacheKey key) {
+                    return loadTable(key);
+                }
+            });
+
+    private static TableExt loadTable(PaimonTableCacheKey key) {
+        try {
+            LOG.warn("load table:{}", key);
+            Catalog catalog = createCatalog(key.getPaimonOptionParams());
+            Table table = catalog.getTable(Identifier.create(key.getDbName(), 
key.getTblName()));
+            return new TableExt(table, System.currentTimeMillis());
+        } catch (Catalog.TableNotExistException e) {
+            LOG.warn("failed to create paimon table ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Catalog createCatalog(Map<String, String> 
paimonOptionParams) {
+        Options options = new Options();
+        paimonOptionParams.entrySet().stream().forEach(kv -> 
options.set(kv.getKey(), kv.getValue()));
+        CatalogContext context = CatalogContext.create(options);
+        return CatalogFactory.createCatalog(context);
+    }
+
+    public static void invalidateTableCache(PaimonTableCacheKey key) {
+        tableCache.invalidate(key);
+    }
+
+    public static TableExt getTable(PaimonTableCacheKey key) {
+        try {
+            return tableCache.get(key);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("failed to get table for:" + key);
+        }
+    }
+
+
+    public static class TableExt {
+        private Table table;
+        private long createTime;
+
+        public TableExt(Table table, long createTime) {
+            this.table = table;
+            this.createTime = createTime;
+        }
+
+        public Table getTable() {
+            return table;
+        }
+
+        public long getCreateTime() {
+            return createTime;
+        }
+    }
+
+    public static class PaimonTableCacheKey {
+        // in key
+        private long ctlId;
+        private long dbId;
+        private long tblId;
+
+        // not in key
+        private Map<String, String> paimonOptionParams;
+        private String dbName;
+        private String tblName;
+
+        public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
+                Map<String, String> paimonOptionParams, String dbName, String 
tblName) {
+            this.ctlId = ctlId;
+            this.dbId = dbId;
+            this.tblId = tblId;
+            this.paimonOptionParams = paimonOptionParams;
+            this.dbName = dbName;
+            this.tblName = tblName;
+        }
+
+        public long getCtlId() {
+            return ctlId;
+        }
+
+        public long getDbId() {
+            return dbId;
+        }
+
+        public long getTblId() {
+            return tblId;
+        }
+
+        public Map<String, String> getPaimonOptionParams() {
+            return paimonOptionParams;
+        }
+
+        public String getDbName() {
+            return dbName;
+        }
+
+        public String getTblName() {
+            return tblName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PaimonTableCacheKey that = (PaimonTableCacheKey) o;
+            return ctlId == that.ctlId
+                    && dbId == that.dbId
+                    && tblId == that.tblId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(ctlId, dbId, tblId);
+        }
+
+        @Override
+        public String toString() {
+            return "PaimonTableCacheKey{"
+                    + "ctlId=" + ctlId
+                    + ", dbId=" + dbId
+                    + ", tblId=" + tblId
+                    + ", paimonOptionParams=" + paimonOptionParams
+                    + ", dbName='" + dbName + '\''
+                    + ", tblName='" + tblName + '\''
+                    + '}';
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
index 6d7c124a0d0..16cfa76e632 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -55,19 +55,21 @@ public class PaimonExternalTable extends ExternalTable {
     protected synchronized void makeSureInitialized() {
         super.makeSureInitialized();
         if (!objectCreated) {
+            originTable = ((PaimonExternalCatalog) 
catalog).getPaimonTable(dbName, name);
+            lastUpdateTime = System.currentTimeMillis();
             objectCreated = true;
         }
     }
 
     public Table getOriginTable() {
-        if (originTable == null) {
-            originTable = ((PaimonExternalCatalog) 
catalog).getPaimonTable(dbName, name);
-        }
+        makeSureInitialized();
         return originTable;
     }
 
     @Override
     public List<Column> initSchema() {
+        //init schema need update lastUpdateTime and get latest schema
+        objectCreated = false;
         Table table = getOriginTable();
         TableSchema schema = ((AbstractFileStoreTable) table).schema();
         List<DataField> columns = schema.fields();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index aedc62869c8..f8306391814 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -102,6 +102,10 @@ public class PaimonScanNode extends FileQueryScanNode {
         fileDesc.setDbName(((PaimonExternalTable) 
source.getTargetTable()).getDbName());
         fileDesc.setPaimonOptions(((PaimonExternalCatalog) 
source.getCatalog()).getPaimonOptionsMap());
         fileDesc.setTableName(source.getTargetTable().getName());
+        fileDesc.setCtlId(source.getCatalog().getId());
+        fileDesc.setDbId(((PaimonExternalTable) 
source.getTargetTable()).getDbId());
+        fileDesc.setTblId(source.getTargetTable().getId());
+        
fileDesc.setLastUpdateTime(source.getTargetTable().getLastUpdateTime());
         tableFormatFileDesc.setPaimonParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index cf599b5d206..9d75c7fc168 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -304,6 +304,10 @@ struct TPaimonFileDesc {
     4: optional string table_name
     5: optional string paimon_predicate
     6: optional map<string, string> paimon_options
+    7: optional i64 ctl_id
+    8: optional i64 db_id
+    9: optional i64 tbl_id
+    10: optional i64 last_update_time
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to