This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2baa0e8891dfb9e1c90b6c546fe38191bbd71986 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Thu Dec 8 20:38:07 2022 +0800 [improvement](multi-catalog)Support invalid/not invalid option for refresh catalog and db. (#14922) Current refresh catalog/db operation always invalid all the related cache. In some cases, it is not necessary, for example, create a new db in external data source. This pr is to support refresh without invalidate cache. refresh catalog hive properties("invalid_cache" = "false"); refresh database hive.db1 properties("invalid_cache" = "false"); --- fe/fe-core/src/main/cup/sql_parser.cup | 12 ++++++------ .../org/apache/doris/analysis/RefreshCatalogStmt.java | 15 ++++++++++++++- .../java/org/apache/doris/analysis/RefreshDbStmt.java | 18 ++++++++++++++++-- .../java/org/apache/doris/catalog/RefreshManager.java | 7 ++++--- .../doris/catalog/external/ExternalDatabase.java | 8 ++++++-- .../org/apache/doris/datasource/CatalogFactory.java | 1 + .../java/org/apache/doris/datasource/CatalogLog.java | 3 +++ .../java/org/apache/doris/datasource/CatalogMgr.java | 8 ++++---- .../org/apache/doris/datasource/EsExternalCatalog.java | 2 +- .../org/apache/doris/datasource/ExternalCatalog.java | 12 ++++++++++-- .../org/apache/doris/datasource/ExternalObjectLog.java | 3 +++ .../apache/doris/datasource/HMSExternalCatalog.java | 2 +- .../apache/doris/datasource/JdbcExternalCatalog.java | 2 +- 13 files changed, 70 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 6b3fff4bd5..5c209fe478 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1137,13 +1137,13 @@ refresh_stmt ::= {: RESULT = new RefreshTableStmt(tbl); :} - | KW_REFRESH KW_DATABASE ident:db + | KW_REFRESH KW_DATABASE ident:db opt_properties:properties {: - RESULT = new RefreshDbStmt(db); + RESULT = new RefreshDbStmt(db, properties); :} - | KW_REFRESH KW_DATABASE ident:ctl DOT ident:db + | KW_REFRESH KW_DATABASE ident:ctl DOT ident:db opt_properties:properties {: - RESULT = new RefreshDbStmt(ctl, db); + RESULT = new RefreshDbStmt(ctl, db, properties); :} | KW_REFRESH KW_MATERIALIZED KW_VIEW table_name:mv {: @@ -1153,9 +1153,9 @@ refresh_stmt ::= {: RESULT = new RefreshMaterializedViewStmt(mv, MVRefreshInfo.RefreshMethod.COMPLETE); :} - | KW_REFRESH KW_CATALOG ident:catalogName + | KW_REFRESH KW_CATALOG ident:catalogName opt_properties:properties {: - RESULT = new RefreshCatalogStmt(catalogName); + RESULT = new RefreshCatalogStmt(catalogName, properties); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java index 481ebf3b9f..e054dca050 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java @@ -27,22 +27,32 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import java.util.Map; + /** * RefreshCatalogStmt * Manually refresh the catalog metadata. */ public class RefreshCatalogStmt extends DdlStmt { + private static final String INVALID_CACHE = "invalid_cache"; private final String catalogName; + private Map<String, String> properties; + private boolean invalidCache = false; - public RefreshCatalogStmt(String catalogName) { + public RefreshCatalogStmt(String catalogName, Map<String, String> properties) { this.catalogName = catalogName; + this.properties = properties; } public String getCatalogName() { return catalogName; } + public boolean isInvalidCache() { + return invalidCache; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -56,6 +66,9 @@ public class RefreshCatalogStmt extends DdlStmt { ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED, analyzer.getQualifiedUser(), catalogName); } + String invalidConfig = properties == null ? null : properties.get(INVALID_CACHE); + // Default is to invalid cache. + invalidCache = invalidConfig == null ? true : invalidConfig.equalsIgnoreCase("true"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java index 5118088916..ccd05a0524 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java @@ -31,19 +31,26 @@ import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + public class RefreshDbStmt extends DdlStmt { private static final Logger LOG = LogManager.getLogger(RefreshDbStmt.class); + private static final String INVALID_CACHE = "invalid_cache"; private String catalogName; private String dbName; + private Map<String, String> properties; + private boolean invalidCache = false; - public RefreshDbStmt(String dbName) { + public RefreshDbStmt(String dbName, Map<String, String> properties) { this.dbName = dbName; + this.properties = properties; } - public RefreshDbStmt(String catalogName, String dbName) { + public RefreshDbStmt(String catalogName, String dbName, Map<String, String> properties) { this.catalogName = catalogName; this.dbName = dbName; + this.properties = properties; } public String getDbName() { @@ -54,6 +61,10 @@ public class RefreshDbStmt extends DdlStmt { return catalogName; } + public boolean isInvalidCache() { + return invalidCache; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); @@ -82,6 +93,9 @@ public class RefreshDbStmt extends DdlStmt { ErrorReport.reportAnalysisException( ErrorCode.ERR_DBACCESS_DENIED_ERROR, analyzer.getQualifiedUser(), dbName); } + String invalidConfig = properties == null ? null : properties.get(INVALID_CACHE); + // Default is to invalid cache. + invalidCache = invalidConfig == null ? true : invalidConfig.equalsIgnoreCase("true"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 558b133f8c..b8d6929994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -76,7 +76,7 @@ public class RefreshManager { refreshInternalCtlIcebergDb(dbName, env); } else { // Process external catalog db refresh - refreshExternalCtlDb(dbName, catalog); + refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache()); } LOG.info("Successfully refresh db: {}", dbName); } @@ -107,7 +107,7 @@ public class RefreshManager { env.getIcebergTableCreationRecordMgr().registerDb(db); } - private void refreshExternalCtlDb(String dbName, CatalogIf catalog) throws DdlException { + private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean invalidCache) throws DdlException { if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support refresh ExternalCatalog Database"); } @@ -116,10 +116,11 @@ public class RefreshManager { if (db == null) { throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); } - ((ExternalDatabase) db).setUnInitialized(); + ((ExternalDatabase) db).setUnInitialized(invalidCache); ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); + log.setInvalidCache(invalidCache); Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 820360c21d..6ae8594c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -65,6 +65,7 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, @SerializedName(value = "initialized") protected boolean initialized = false; protected ExternalCatalog extCatalog; + protected boolean invalidCacheInInit = true; /** * No args constructor for persist. @@ -93,9 +94,12 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, public void setTableExtCatalog(ExternalCatalog extCatalog) { } - public void setUnInitialized() { + public void setUnInitialized(boolean invalidCache) { this.initialized = false; - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name); + this.invalidCacheInInit = invalidCache; + if (invalidCache) { + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name); + } } public boolean isInitialized() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index f5704e909d..c5599ca411 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -54,6 +54,7 @@ public class CatalogFactory { log.setNewCatalogName(((AlterCatalogNameStmt) stmt).getNewCatalogName()); } else if (stmt instanceof RefreshCatalogStmt) { log.setCatalogId(catalogId); + log.setInvalidCache(((RefreshCatalogStmt) stmt).isInvalidCache()); } else { throw new RuntimeException("Unknown stmt for catalog manager " + stmt.getClass().getSimpleName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java index 0fc23af7b9..7e54e18386 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java @@ -53,6 +53,9 @@ public class CatalogLog implements Writable { @SerializedName(value = "newProps") private Map<String, String> newProps; + @SerializedName(value = "invalidCache") + private boolean invalidCache; + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 854ee223d1..f3b9299bc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -114,12 +114,12 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return catalog; } - private void unprotectedRefreshCatalog(long catalogId) { + private void unprotectedRefreshCatalog(long catalogId, boolean invalidCache) { CatalogIf catalog = idToCatalog.get(catalogId); if (catalog != null) { String catalogName = catalog.getName(); if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { - ((ExternalCatalog) catalog).setUninitialized(); + ((ExternalCatalog) catalog).setUninitialized(invalidCache); } } } @@ -448,7 +448,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { public void replayRefreshCatalog(CatalogLog log) throws DdlException { writeLock(); try { - unprotectedRefreshCatalog(log.getCatalogId()); + unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache()); } finally { writeUnlock(); } @@ -504,7 +504,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { try { ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - db.setUnInitialized(); + db.setUnInitialized(log.isInvalidCache()); } finally { writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java index 496eabfc8c..f078998fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java @@ -134,7 +134,7 @@ public class EsExternalCatalog extends ExternalCatalog { initCatalogLog.setCatalogId(id); initCatalogLog.setType(InitCatalogLog.Type.ES); if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) { - idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(); + idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(invalidCacheInInit); initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB)); } else { dbNameToId = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 2369e6afd4..cc0c2f0b2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -69,6 +69,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr // db name does not contains "default_cluster" protected Map<String, Long> dbNameToId = Maps.newConcurrentMap(); private boolean objectCreated = false; + protected boolean invalidCacheInInit = true; private ExternalSchemaCache schemaCache; @@ -129,8 +130,15 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr // init schema related objects protected abstract void init(); - public void setUninitialized() { + public void setUninitialized(boolean invalidCache) { this.initialized = false; + this.invalidCacheInInit = invalidCache; + if (invalidCache) { + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); + } + } + + public void updateDbList() { Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); } @@ -209,7 +217,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i)); - db.setUnInitialized(); + db.setUnInitialized(invalidCacheInInit); tmpDbNameToId.put(db.getFullName(), db.getId()); tmpIdToDb.put(db.getId(), db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index cff446657e..030f981382 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -43,6 +43,9 @@ public class ExternalObjectLog implements Writable { @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "invalidCache") + private boolean invalidCache; + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 0dfffe19fb..8b79bf97f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -78,7 +78,7 @@ public class HMSExternalCatalog extends ExternalCatalog { dbId = dbNameToId.get(dbName); tmpDbNameToId.put(dbName, dbId); ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(); + db.setUnInitialized(invalidCacheInInit); tmpIdToDb.put(dbId, db); initCatalogLog.addRefreshDb(dbId); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java index bcae5988d9..713c8176d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java @@ -122,7 +122,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { dbId = dbNameToId.get(dbName); tmpDbNameToId.put(dbName, dbId); ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(); + db.setUnInitialized(invalidCacheInInit); tmpIdToDb.put(dbId, db); initCatalogLog.addRefreshDb(dbId); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org