This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f2d84d81e6 [feature-wip][refactor](multi-catalog) Persist external catalog related metadata. (#13746) f2d84d81e6 is described below commit f2d84d81e651d3de8e4c67a0adbf927a7ca04268 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri Nov 4 09:04:00 2022 +0800 [feature-wip][refactor](multi-catalog) Persist external catalog related metadata. (#13746) Persist external catalog/db/table, including the columns of external tables. After this change, external objects could have their own uniq ID through their lifetime, this is required for the statistic information collection. --- .../main/java/org/apache/doris/catalog/Column.java | 5 +- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../org/apache/doris/catalog/RefreshManager.java | 10 ++ .../doris/catalog/external/EsExternalDatabase.java | 79 +++++++++++- .../doris/catalog/external/EsExternalTable.java | 79 ++++++++---- .../doris/catalog/external/ExternalDatabase.java | 63 +++++++++- .../doris/catalog/external/ExternalTable.java | 81 ++++++++++--- .../catalog/external/HMSExternalDatabase.java | 81 ++++++++++++- .../doris/catalog/external/HMSExternalTable.java | 133 +++++++++++++-------- .../org/apache/doris/datasource/CatalogMgr.java | 50 ++++++++ .../apache/doris/datasource/EsExternalCatalog.java | 49 ++++++-- .../apache/doris/datasource/ExternalCatalog.java | 78 +++++++++++- .../apache/doris/datasource/ExternalObjectLog.java | 56 +++++++++ .../doris/datasource/HMSExternalCatalog.java | 72 +++++++---- .../apache/doris/datasource/InitCatalogLog.java | 92 ++++++++++++++ .../apache/doris/datasource/InitDatabaseLog.java | 96 +++++++++++++++ .../org/apache/doris/datasource/InitTableLog.java | 67 +++++++++++ .../doris/external/elasticsearch/EsUtil.java | 2 + .../org/apache/doris/journal/JournalEntity.java | 25 ++++ .../java/org/apache/doris/persist/EditLog.java | 49 ++++++++ .../org/apache/doris/persist/OperationType.java | 5 + .../org/apache/doris/persist/gson/GsonUtils.java | 26 +++- .../org/apache/doris/qe/MasterCatalogExecutor.java | 84 +++++++++++++ .../apache/doris/service/FrontendServiceImpl.java | 76 ++++++++++++ .../apache/doris/datasource/CatalogMgrTest.java | 42 ++++++- gensrc/thrift/FrontendService.thrift | 12 ++ 26 files changed, 1273 insertions(+), 141 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index a3d529d718..532a49b0eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -597,10 +597,9 @@ public class Column implements Writable { && getStrLen() == other.getStrLen() && getPrecision() == other.getPrecision() && getScale() == other.getScale() - && comment.equals(other.comment) + && Objects.equals(comment, other.comment) && visible == other.visible - && children.size() == other.children.size() - && children.equals(other.children); + && Objects.equals(children, other.children); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e665064a35..05bfc80efa 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1937,7 +1937,7 @@ public class Env { **/ public long loadCatalog(DataInputStream in, long checksum) throws IOException { CatalogMgr mgr = CatalogMgr.read(in); - // When enable the multi catalog in the first time, the mgr will be a null value. + // When enable the multi catalog in the first time, the "mgr" will be a null value. // So ignore it to use default catalog manager. if (mgr != null) { this.catalogMgr = mgr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 31cbc266f5..bb37a9b1b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InternalCatalog; import org.apache.logging.log4j.LogManager; @@ -117,6 +118,10 @@ public class RefreshManager { throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); } ((ExternalDatabase) db).setUnInitialized(); + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log); } private void refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) throws UserException { @@ -156,5 +161,10 @@ public class RefreshManager { throw new DdlException("Table " + tableName + " does not exist in db " + dbName); } ((ExternalTable) table).setUnInitialized(); + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java index 8898ede84d..f9ba015826 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java @@ -20,27 +20,34 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Env; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Elasticsearch metastore external database. */ -public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> { +public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class); // Cache of table name to table id. private Map<String, Long> tableNameToId = Maps.newConcurrentMap(); - private Map<Long, EsExternalTable> idToTbl = Maps.newHashMap(); + @SerializedName(value = "idToTbl") + private Map<Long, EsExternalTable> idToTbl = Maps.newConcurrentMap(); /** * Create Elasticsearch external database. @@ -53,14 +60,53 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> { super(extCatalog, id, name); } - private synchronized void makeSureInitialized() { + public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { + Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); + Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap(); + for (int i = 0; i < log.getRefreshCount(); i++) { + EsExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); + table.setUnInitialized(); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + for (int i = 0; i < log.getCreateCount(); i++) { + EsExternalTable table = new EsExternalTable(log.getCreateTableIds().get(i), + log.getCreateTableNames().get(i), name, (EsExternalCatalog) catalog); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + initialized = true; + } + + public void setTableExtCatalog(ExternalCatalog extCatalog) { + for (EsExternalTable table : idToTbl.values()) { + table.setCatalog(extCatalog); + } + } + + public synchronized void makeSureInitialized() { if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(extCatalog.getId(), id, -1); + } catch (Exception e) { + LOG.warn("Failed to forward init db {} operation to master. {}", name, e.getMessage()); + } + return; + } init(); - initialized = true; } } private void init() { + InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); + initDatabaseLog.setType(InitDatabaseLog.Type.ES); + initDatabaseLog.setCatalogId(extCatalog.getId()); + initDatabaseLog.setDbId(id); List<String> tableNames = extCatalog.listTableNames(null, name); if (tableNames != null) { Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); @@ -73,15 +119,20 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> { EsExternalTable table = idToTbl.get(tblId); table.setUnInitialized(); tmpIdToTbl.put(tblId, table); + initDatabaseLog.addRefreshTable(tblId); } else { tblId = Env.getCurrentEnv().getNextId(); tmpTableNameToId.put(tableName, tblId); - tmpIdToTbl.put(tblId, new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog)); + EsExternalTable table = new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addCreateTable(tblId, tableName); } } tableNameToId = tmpTableNameToId; idToTbl = tmpIdToTbl; } + initialized = true; + Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); } @Override @@ -110,4 +161,22 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> { makeSureInitialized(); return idToTbl.get(tableId); } + + public EsExternalTable getTableForReplay(long tableId) { + return idToTbl.get(tableId); + } + + @Override + public void gsonPostProcess() throws IOException { + tableNameToId = Maps.newConcurrentMap(); + for (EsExternalTable tbl : idToTbl.values()) { + tableNameToId.put(tbl.getName(), tbl.getId()); + } + rwLock = new ReentrantReadWriteLock(true); + } + + public void addTableForTest(EsExternalTable tbl) { + idToTbl.put(tbl.getId(), tbl); + tableNameToId.put(tbl.getName(), tbl.getId()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index ea44552896..15d1384744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -18,9 +18,12 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EsTable; import org.apache.doris.datasource.EsExternalCatalog; +import org.apache.doris.datasource.InitTableLog; import org.apache.doris.external.elasticsearch.EsUtil; +import org.apache.doris.qe.MasterCatalogExecutor; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -36,9 +39,6 @@ import java.util.List; public class EsExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(EsExternalTable.class); - - private final EsExternalCatalog catalog; - private final String dbName; private EsTable esTable; /** @@ -50,23 +50,57 @@ public class EsExternalTable extends ExternalTable { * @param catalog HMSExternalDataSource. */ public EsExternalTable(long id, String name, String dbName, EsExternalCatalog catalog) { - super(id, name); - this.dbName = dbName; - this.catalog = catalog; - this.type = TableType.ES_EXTERNAL_TABLE; + super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE); } - private synchronized void makeSureInitialized() { + public synchronized void makeSureInitialized() { if (!initialized) { - init(); - initialized = true; + if (!Env.getCurrentEnv().isMaster()) { + fullSchema = null; + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(catalog.getId(), catalog.getDbNullable(dbName).getId(), id); + } catch (Exception e) { + LOG.warn("Failed to forward init table {} operation to master. {}", name, e.getMessage()); + } + } else { + init(); + } + } + if (!objectCreated) { + esTable = toEsTable(); + objectCreated = true; } } private void init() { - fullSchema = EsUtil.genColumnsFromEs(catalog.getEsRestClient(), name, null); - esTable = toEsTable(); + boolean schemaChanged = false; + List<Column> tmpSchema = EsUtil.genColumnsFromEs( + ((EsExternalCatalog) catalog).getEsRestClient(), name, null); + if (fullSchema == null || fullSchema.size() != tmpSchema.size()) { + schemaChanged = true; + } else { + for (int i = 0; i < fullSchema.size(); i++) { + if (!fullSchema.get(i).equals(tmpSchema.get(i))) { + schemaChanged = true; + break; + } + } + } + if (schemaChanged) { + timestamp = System.currentTimeMillis(); + fullSchema = tmpSchema; + esTable = toEsTable(); + } + initialized = true; + InitTableLog initTableLog = new InitTableLog(); + initTableLog.setCatalogId(catalog.getId()); + initTableLog.setDbId(catalog.getDbNameToId().get(dbName)); + initTableLog.setTableId(id); + initTableLog.setSchema(fullSchema); + Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog); } @Override @@ -107,7 +141,7 @@ public class EsExternalTable extends ExternalTable { } /** - * get database name of hms table. + * get database name of es table. */ public String getDbName() { return dbName; @@ -123,17 +157,18 @@ public class EsExternalTable extends ExternalTable { } private EsTable toEsTable() { + EsExternalCatalog esCatalog = (EsExternalCatalog) catalog; EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, TableType.ES_EXTERNAL_TABLE); esTable.setIndexName(name); - esTable.setClient(catalog.getEsRestClient()); - esTable.setUserName(catalog.getUsername()); - esTable.setPasswd(catalog.getPassword()); - esTable.setEnableDocValueScan(catalog.isEnableDocValueScan()); - esTable.setEnableKeywordSniff(catalog.isEnableKeywordSniff()); - esTable.setNodesDiscovery(catalog.isEnableNodesDiscovery()); - esTable.setHttpSslEnabled(catalog.isEnableSsl()); - esTable.setSeeds(catalog.getNodes()); - esTable.setHosts(String.join(",", catalog.getNodes())); + esTable.setClient(esCatalog.getEsRestClient()); + esTable.setUserName(esCatalog.getUsername()); + esTable.setPasswd(esCatalog.getPassword()); + esTable.setEnableDocValueScan(esCatalog.isEnableDocValueScan()); + esTable.setEnableKeywordSniff(esCatalog.isEnableKeywordSniff()); + esTable.setNodesDiscovery(esCatalog.isEnableNodesDiscovery()); + esTable.setHttpSslEnabled(esCatalog.isEnableSsl()); + esTable.setSeeds(esCatalog.getNodes()); + esTable.setHosts(String.join(",", esCatalog.getNodes())); esTable.syncTableMetaData(); return esTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index d76ed380a7..c5fa7b9875 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -21,13 +21,22 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.DatabaseProperty; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -38,17 +47,28 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * * @param <T> External table type is ExternalTable or its subclass. */ -public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T> { +public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class); - private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + @SerializedName(value = "id") protected long id; + @SerializedName(value = "name") protected String name; - protected ExternalCatalog extCatalog; - protected DatabaseProperty dbProperties; + @SerializedName(value = "dbProperties") + protected DatabaseProperty dbProperties = new DatabaseProperty(); + @SerializedName(value = "initialized") protected boolean initialized = false; + protected ExternalCatalog extCatalog; + + /** + * No args constructor for persist. + */ + public ExternalDatabase() { + initialized = false; + } /** * Create external database. @@ -63,10 +83,30 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T> this.name = name; } - public synchronized void setUnInitialized() { + public void setExtCatalog(ExternalCatalog extCatalog) { + this.extCatalog = extCatalog; + } + + public void setTableExtCatalog(ExternalCatalog extCatalog) {} + + public void setUnInitialized() { this.initialized = false; } + public boolean isInitialized() { + return initialized; + } + + public void makeSureInitialized() {} + + public T getTableForReplay(long tableId) { + throw new NotImplementedException(); + } + + public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { + throw new NotImplementedException(); + } + @Override public void readLock() { this.rwLock.readLock().lock(); @@ -177,4 +217,17 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T> public T getTableNullable(long tableId) { throw new NotImplementedException(); } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static ExternalDatabase read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ExternalDatabase.class); + } + + @Override + public void gsonPostProcess() throws IOException {} } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 33acbc0ec7..f713de7419 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -22,12 +22,22 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TTableDescriptor; +import com.google.gson.annotations.SerializedName; +import lombok.Getter; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -36,26 +46,36 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * External table represent tables that are not self-managed by Doris. * Such as tables from hive, iceberg, es, etc. */ -public class ExternalTable implements TableIf { - +@Getter +public class ExternalTable implements TableIf, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalTable.class); + @SerializedName(value = "id") protected long id; + @SerializedName(value = "name") protected String name; - protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + @SerializedName(value = "type") protected TableType type = null; + @SerializedName(value = "fullSchema") protected volatile List<Column> fullSchema = null; + @SerializedName(value = "initialized") protected boolean initialized = false; + @SerializedName(value = "timestamp") + protected long timestamp; + + protected ExternalCatalog catalog; + @SerializedName(value = "dbName") + protected String dbName; + protected boolean objectCreated = false; + protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); /** - * Create external table. - * - * @param id Table id. - * @param name Table name. + * No args constructor for persist. */ - public ExternalTable(long id, String name) { - this.id = id; - this.name = name; + public ExternalTable() { + this.initialized = false; + this.objectCreated = false; + this.fullSchema = null; } /** @@ -63,23 +83,40 @@ public class ExternalTable implements TableIf { * * @param id Table id. * @param name Table name. + * @param catalog ExternalCatalog this table belongs to. + * @param dbName Name of the db the this table belongs to. * @param type Table type. */ - public ExternalTable(long id, String name, TableType type) { + public ExternalTable(long id, String name, ExternalCatalog catalog, String dbName, TableType type) { this.id = id; this.name = name; + this.catalog = catalog; + this.dbName = dbName; this.type = type; + this.initialized = false; + this.objectCreated = false; + this.fullSchema = null; + } + + public void setCatalog(ExternalCatalog catalog) { + this.catalog = catalog; } public boolean isView() { return false; } - public synchronized void setUnInitialized() { + public void setUnInitialized() { this.initialized = false; - this.fullSchema = null; } + public void replayInitTable(List<Column> schema) { + fullSchema = schema; + initialized = true; + } + + public void makeSureInitialized() {} + @Override public void readLock() { this.rwLock.readLock().lock(); @@ -260,10 +297,26 @@ public class ExternalTable implements TableIf { @Override public String getComment(boolean escapeQuota) { return ""; - } public TTableDescriptor toThrift() { return null; } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ExternalTable read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ExternalTable.class); + } + + @Override + public void gsonPostProcess() throws IOException { + rwLock = new ReentrantReadWriteLock(true); + objectCreated = false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index b8d9287982..dc79a679f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -21,28 +21,35 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** * Hive metastore external database. */ -public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { +public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(HMSExternalDatabase.class); // Cache of table name to table id. private Map<String, Long> tableNameToId = Maps.newConcurrentMap(); - private Map<Long, HMSExternalTable> idToTbl = Maps.newHashMap(); + @SerializedName(value = "idToTbl") + private Map<Long, HMSExternalTable> idToTbl = Maps.newConcurrentMap(); /** * Create HMS external database. @@ -55,14 +62,53 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { super(extCatalog, id, name); } - private synchronized void makeSureInitialized() { + public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { + Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); + Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap(); + for (int i = 0; i < log.getRefreshCount(); i++) { + HMSExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); + table.setUnInitialized(); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + for (int i = 0; i < log.getCreateCount(); i++) { + HMSExternalTable table = new HMSExternalTable(log.getCreateTableIds().get(i), + log.getCreateTableNames().get(i), name, (HMSExternalCatalog) catalog); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + initialized = true; + } + + public void setTableExtCatalog(ExternalCatalog extCatalog) { + for (HMSExternalTable table : idToTbl.values()) { + table.setCatalog(extCatalog); + } + } + + public synchronized void makeSureInitialized() { if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(extCatalog.getId(), id, -1); + } catch (Exception e) { + LOG.warn("Failed to forward init db {} operation to master. {}", name, e.getMessage()); + } + return; + } init(); - initialized = true; } } private void init() { + InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); + initDatabaseLog.setType(InitDatabaseLog.Type.HMS); + initDatabaseLog.setCatalogId(extCatalog.getId()); + initDatabaseLog.setDbId(id); List<String> tableNames = extCatalog.listTableNames(null, name); if (tableNames != null) { Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); @@ -75,16 +121,21 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { HMSExternalTable table = idToTbl.get(tblId); table.setUnInitialized(); tmpIdToTbl.put(tblId, table); + initDatabaseLog.addRefreshTable(tblId); } else { tblId = Env.getCurrentEnv().getNextId(); tmpTableNameToId.put(tableName, tblId); - tmpIdToTbl.put(tblId, - new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog)); + HMSExternalTable table = new HMSExternalTable(tblId, tableName, name, + (HMSExternalCatalog) extCatalog); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addCreateTable(tblId, tableName); } } tableNameToId = tmpTableNameToId; idToTbl = tmpIdToTbl; } + initialized = true; + Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); } @Override @@ -119,4 +170,22 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { makeSureInitialized(); return idToTbl.get(tableId); } + + public HMSExternalTable getTableForReplay(long tableId) { + return idToTbl.get(tableId); + } + + @Override + public void gsonPostProcess() throws IOException { + tableNameToId = Maps.newConcurrentMap(); + for (HMSExternalTable tbl : idToTbl.values()) { + tableNameToId.put(tbl.getName(), tbl.getId()); + } + rwLock = new ReentrantReadWriteLock(true); + } + + public void addTableForTest(HMSExternalTable tbl) { + idToTbl.put(tbl.getId(), tbl); + tableNameToId.put(tbl.getName(), tbl.getId()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index cbe84744ea..9d27cf0fb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,10 +18,13 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.InitTableLog; +import org.apache.doris.qe.MasterCatalogExecutor; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -39,12 +42,9 @@ import java.util.Map; * Hive metastore external table. */ public class HMSExternalTable extends ExternalTable { - private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class); - private final HMSExternalCatalog catalog; - private final String dbName; - private final List<String> supportedHiveFileFormats = Lists.newArrayList( + private List<String> supportedHiveFileFormats = Lists.newArrayList( "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", "org.apache.hadoop.mapred.TextInputFormat"); @@ -65,10 +65,7 @@ public class HMSExternalTable extends ExternalTable { * @param catalog HMSExternalCatalog. */ public HMSExternalTable(long id, String name, String dbName, HMSExternalCatalog catalog) { - super(id, name); - this.dbName = dbName; - this.catalog = catalog; - this.type = TableType.HMS_EXTERNAL_TABLE; + super(id, name, catalog, dbName, TableType.HMS_EXTERNAL_TABLE); } public boolean isSupportedHmsTable() { @@ -76,34 +73,45 @@ public class HMSExternalTable extends ExternalTable { return dlaType != DLAType.UNKNOWN; } - private synchronized void makeSureInitialized() { - if (!initialized) { - init(); - initialized = true; - } - } - - private void init() { - try { - getRemoteTable(); - } catch (MetaNotFoundException e) { - // CHECKSTYLE IGNORE THIS LINE - } - if (remoteTable == null) { - dlaType = DLAType.UNKNOWN; - fullSchema = Lists.newArrayList(); - } else { - if (supportedIcebergTable()) { - dlaType = DLAType.ICEBERG; - } else if (supportedHoodieTable()) { - dlaType = DLAType.HUDI; - } else if (supportedHiveTable()) { - dlaType = DLAType.HIVE; - } else { + public synchronized void makeSureInitialized() { + if (!objectCreated) { + try { + getRemoteTable(); + } catch (MetaNotFoundException e) { + // CHECKSTYLE IGNORE THIS LINE + } + supportedHiveFileFormats = Lists.newArrayList( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.mapred.TextInputFormat"); + if (remoteTable == null) { dlaType = DLAType.UNKNOWN; - fullSchema = Lists.newArrayList(); + } else { + if (supportedIcebergTable()) { + dlaType = DLAType.ICEBERG; + } else if (supportedHoodieTable()) { + dlaType = DLAType.HUDI; + } else if (supportedHiveTable()) { + dlaType = DLAType.HIVE; + } else { + dlaType = DLAType.UNKNOWN; + } } - initSchema(); + objectCreated = true; + } + if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + fullSchema = null; + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(catalog.getId(), catalog.getDbNullable(dbName).getId(), id); + } catch (Exception e) { + LOG.warn("Failed to forward init table {} operation to master. {}", name, e.getMessage()); + } + return; + } + init(); } } @@ -150,24 +158,45 @@ public class HMSExternalTable extends ExternalTable { return isManagedTable && supportedFileFormat; } - private void initSchema() { - if (fullSchema == null) { - synchronized (this) { - if (fullSchema == null) { - fullSchema = Lists.newArrayList(); - try { - for (FieldSchema field : HiveMetaStoreClientHelper.getSchema(dbName, name, - catalog.getHiveMetastoreUris())) { - fullSchema.add(new Column(field.getName(), - HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, - null, field.getComment())); - } - } catch (DdlException e) { - LOG.warn("Fail to get schema of hms table {}", name, e); + private void init() { + boolean schemaChanged = false; + List<Column> tmpSchema = Lists.newArrayList(); + if (dlaType.equals(DLAType.UNKNOWN)) { + schemaChanged = true; + } else { + try { + for (FieldSchema field : HiveMetaStoreClientHelper.getSchema(dbName, name, + ((HMSExternalCatalog) catalog).getHiveMetastoreUris())) { + int columnId = (int) Env.getCurrentEnv().getNextId(); + tmpSchema.add(new Column(field.getName(), + HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, + true, null, field.getComment(), true, null, columnId)); + } + } catch (DdlException e) { + LOG.warn("Fail to get schema of hms table {}", name, e); + } + if (fullSchema == null || fullSchema.size() != tmpSchema.size()) { + schemaChanged = true; + } else { + for (int i = 0; i < fullSchema.size(); i++) { + if (!fullSchema.get(i).equals(tmpSchema.get(i))) { + schemaChanged = true; + break; } } } } + if (schemaChanged) { + timestamp = System.currentTimeMillis(); + fullSchema = tmpSchema; + } + initialized = true; + InitTableLog initTableLog = new InitTableLog(); + initTableLog.setCatalogId(catalog.getId()); + initTableLog.setDbId(catalog.getDbNameToId().get(dbName)); + initTableLog.setTableId(id); + initTableLog.setSchema(fullSchema); + Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog); } /** @@ -177,11 +206,11 @@ public class HMSExternalTable extends ExternalTable { if (remoteTable == null) { synchronized (this) { if (remoteTable == null) { + String uri = ((HMSExternalCatalog) catalog).getHiveMetastoreUris(); try { - remoteTable = HiveMetaStoreClientHelper.getTable(dbName, name, catalog.getHiveMetastoreUris()); + remoteTable = HiveMetaStoreClientHelper.getTable(dbName, name, uri); } catch (DdlException e) { - LOG.warn("Fail to get remote hive table. db {}, table {}, uri {}", dbName, name, - catalog.getHiveMetastoreUris()); + LOG.warn("Fail to get remote hive table. db {}, table {}, uri {}", dbName, name, uri); throw new MetaNotFoundException(e); } } @@ -300,7 +329,7 @@ public class HMSExternalTable extends ExternalTable { } public String getMetastoreUri() { - return catalog.getHiveMetastoreUris(); + return ((HMSExternalCatalog) catalog).getHiveMetastoreUris(); } public Map<String, String> getDfsProperties() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 5fd6079d5c..b92a515751 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -25,6 +25,8 @@ import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.ShowCatalogStmt; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -39,6 +41,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -406,6 +409,53 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } + public void replayInitCatalog(InitCatalogLog log) { + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + Preconditions.checkArgument(catalog != null); + catalog.replayInitCatalog(log); + } + + public void replayInitExternalDb(InitDatabaseLog log) { + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + Preconditions.checkArgument(catalog != null); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + Preconditions.checkArgument(db != null); + db.replayInitDb(log, catalog); + } + + public void replayInitExternalTable(InitTableLog log) { + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + Preconditions.checkArgument(catalog != null); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + Preconditions.checkArgument(db != null); + ExternalTable table = db.getTableForReplay(log.getTableId()); + Preconditions.checkArgument(table != null); + table.replayInitTable(log.getSchema()); + } + + public void replayRefreshExternalDb(ExternalObjectLog log) { + writeLock(); + try { + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + db.setUnInitialized(); + } finally { + writeUnlock(); + } + } + + public void replayRefreshExternalTable(ExternalObjectLog log) { + writeLock(); + try { + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + ExternalTable table = db.getTableForReplay(log.getTableId()); + table.setUnInitialized(); + } finally { + writeUnlock(); + } + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java index baa2c6e169..180d5730bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java @@ -25,6 +25,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; import org.apache.doris.external.elasticsearch.EsRestClient; import org.apache.doris.external.elasticsearch.EsUtil; +import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -54,10 +55,6 @@ public class EsExternalCatalog extends ExternalCatalog { private static final String PROP_NODES_DISCOVERY = "elasticsearch.nodes_discovery"; private static final String PROP_SSL = "elasticsearch.ssl"; - // Cache of db name to db id. - private Map<String, Long> dbNameToId; - private Map<Long, EsExternalDatabase> idToDb; - private EsRestClient esRestClient; private String[] nodes; @@ -138,29 +135,56 @@ public class EsExternalCatalog extends ExternalCatalog { * Datasource can't be init when creating because the external datasource may depend on third system. * So you have to make sure the client of third system is initialized before any method was called. */ - private synchronized void makeSureInitialized() { + @Override + public synchronized void makeSureInitialized() { + if (!objectCreated) { + try { + validate(catalogProperty.getProperties()); + } catch (DdlException e) { + LOG.warn("validate error", e); + } + esRestClient = new EsRestClient(this.nodes, this.username, this.password, this.enableSsl); + objectCreated = true; + } if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(id, -1, -1); + } catch (Exception e) { + LOG.warn("Failed to forward init catalog {} operation to master. {}", name, e.getMessage()); + } + return; + } init(); - initialized = true; } } private void init() { + InitCatalogLog initCatalogLog = new InitCatalogLog(); try { validate(this.catalogProperty.getProperties()); } catch (DdlException e) { LOG.warn("validate error", e); } this.esRestClient = new EsRestClient(this.nodes, this.username, this.password, this.enableSsl); + initCatalogLog.setCatalogId(id); + initCatalogLog.setType(InitCatalogLog.Type.ES); if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) { idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(); + initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB)); } else { dbNameToId = Maps.newConcurrentMap(); idToDb = Maps.newConcurrentMap(); long defaultDbId = Env.getCurrentEnv().getNextId(); dbNameToId.put(DEFAULT_DB, defaultDbId); - idToDb.put(defaultDbId, new EsExternalDatabase(this, defaultDbId, DEFAULT_DB)); + EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId, DEFAULT_DB); + idToDb.put(defaultDbId, db); + initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB); } + initialized = true; + Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); } @Override @@ -185,6 +209,13 @@ public class EsExternalCatalog extends ExternalCatalog { return idToDb.get(dbNameToId.get(realDbName)); } + @Nullable + @Override + public ExternalDatabase getDbNullable(long dbId) { + makeSureInitialized(); + return idToDb.get(dbId); + } + @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { return esRestClient.existIndex(this.esRestClient.getClient(), tblName); @@ -194,4 +225,8 @@ public class EsExternalCatalog extends ExternalCatalog { public List<Long> getDbIds() { return Lists.newArrayList(dbNameToId.values()); } + + public ExternalDatabase getDbForReplay(long dbId) { + return idToDb.get(dbId); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index a89e53825e..e6e9609ab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -17,15 +17,21 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import lombok.Data; import org.apache.commons.lang.NotImplementedException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import java.io.DataInput; @@ -38,7 +44,9 @@ import java.util.Map; * The abstract class for all types of external catalogs. */ @Data -public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Writable { +public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Writable, GsonPostProcessable { + private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; @@ -49,8 +57,16 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr // save properties of this catalog, such as hive meta store url. @SerializedName(value = "catalogProperty") protected CatalogProperty catalogProperty = new CatalogProperty(); + @SerializedName(value = "initialized") protected boolean initialized = false; + // Cache of db name to db id + @SerializedName(value = "idToDb") + protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap(); + // db name does not contains "default_cluster" + protected Map<String, Long> dbNameToId = Maps.newConcurrentMap(); + protected boolean objectCreated = false; + /** * @return names of database in this catalog. */ @@ -71,6 +87,16 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr */ public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName); + public abstract void makeSureInitialized(); + + public void setInitialized(boolean initialized) { + this.initialized = initialized; + } + + public ExternalDatabase getDbForReplay(long dbId) { + throw new NotImplementedException(); + } + @Override public long getId() { return id; @@ -123,6 +149,40 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr Text.writeString(out, GsonUtils.GSON.toJson(this)); } + public void replayInitCatalog(InitCatalogLog log) { + Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap(); + Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap(); + for (int i = 0; i < log.getRefreshCount(); i++) { + ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i)); + db.setUnInitialized(); + tmpDbNameToId.put(db.getFullName(), db.getId()); + tmpIdToDb.put(db.getId(), db); + } + switch (log.getType()) { + case HMS: + for (int i = 0; i < log.getCreateCount(); i++) { + HMSExternalDatabase db = new HMSExternalDatabase( + this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); + tmpDbNameToId.put(db.getFullName(), db.getId()); + tmpIdToDb.put(db.getId(), db); + } + break; + case ES: + for (int i = 0; i < log.getCreateCount(); i++) { + EsExternalDatabase db = new EsExternalDatabase( + this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); + tmpDbNameToId.put(db.getFullName(), db.getId()); + tmpIdToDb.put(db.getId(), db); + } + break; + default: + break; + } + dbNameToId = tmpDbNameToId; + idToDb = tmpIdToDb; + initialized = true; + } + /** * External catalog has no cluster semantics. */ @@ -134,4 +194,20 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ExternalCatalog.class); } + + @Override + public void gsonPostProcess() throws IOException { + dbNameToId = Maps.newConcurrentMap(); + for (ExternalDatabase db : idToDb.values()) { + dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); + db.setExtCatalog(this); + db.setTableExtCatalog(this); + } + objectCreated = false; + } + + public void addDatabaseForTest(ExternalDatabase db) { + idToDb.put(db.getId(), db); + dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java new file mode 100644 index 0000000000..cff446657e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +@NoArgsConstructor +@Getter +@Data +public class ExternalObjectLog implements Writable { + @SerializedName(value = "catalogId") + private long catalogId; + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "tableId") + private long tableId; + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static ExternalObjectLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ExternalObjectLog.class); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 32847844dc..6154ddd4af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -32,6 +33,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -41,9 +43,6 @@ import java.util.Map; public class HMSExternalCatalog extends ExternalCatalog { private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class); - // Cache of db name to db id. - private Map<String, Long> dbNameToId = Maps.newConcurrentMap(); - private Map<Long, HMSExternalDatabase> idToDb = Maps.newConcurrentMap(); protected HiveMetaStoreClient client; /** @@ -63,15 +62,10 @@ public class HMSExternalCatalog extends ExternalCatalog { private void init() { Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap(); - Map<Long, HMSExternalDatabase> tmpIdToDb = Maps.newConcurrentMap(); - HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, getHiveMetastoreUris()); - try { - client = new HiveMetaStoreClient(hiveConf); - } catch (MetaException e) { - LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage()); - return; - } + Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap(); + InitCatalogLog initCatalogLog = new InitCatalogLog(); + initCatalogLog.setCatalogId(id); + initCatalogLog.setType(InitCatalogLog.Type.HMS); List<String> allDatabases; try { allDatabases = client.getAllDatabases(); @@ -88,27 +82,53 @@ public class HMSExternalCatalog extends ExternalCatalog { if (dbNameToId != null && dbNameToId.containsKey(dbName)) { dbId = dbNameToId.get(dbName); tmpDbNameToId.put(dbName, dbId); - HMSExternalDatabase db = idToDb.get(dbId); + ExternalDatabase db = idToDb.get(dbId); db.setUnInitialized(); tmpIdToDb.put(dbId, db); + initCatalogLog.addRefreshDb(dbId); } else { dbId = Env.getCurrentEnv().getNextId(); tmpDbNameToId.put(dbName, dbId); - tmpIdToDb.put(dbId, new HMSExternalDatabase(this, dbId, dbName)); + HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName); + tmpIdToDb.put(dbId, db); + initCatalogLog.addCreateDb(dbId, dbName); } } dbNameToId = tmpDbNameToId; idToDb = tmpIdToDb; + initialized = true; + Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); } /** * Catalog can't be init when creating because the external catalog may depend on third system. * So you have to make sure the client of third system is initialized before any method was called. */ - private synchronized void makeSureInitialized() { + @Override + public synchronized void makeSureInitialized() { + if (!objectCreated) { + try { + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, getHiveMetastoreUris()); + client = new HiveMetaStoreClient(hiveConf); + objectCreated = true; + } catch (MetaException e) { + LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage()); + return; + } + } if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(id, -1, -1); + } catch (Exception e) { + LOG.warn("Failed to forward init catalog {} operation to master. {}", name, e.getMessage()); + } + return; + } init(); - initialized = true; } } @@ -120,10 +140,18 @@ public class HMSExternalCatalog extends ExternalCatalog { @Override public List<String> listTableNames(SessionContext ctx, String dbName) { - try { - return client.getAllTables(getRealTableName(dbName)); - } catch (MetaException e) { - LOG.warn("List Table Names failed. {}", e.getMessage()); + makeSureInitialized(); + HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName)); + if (hmsExternalDatabase != null && hmsExternalDatabase.isInitialized()) { + ArrayList<String> names = Lists.newArrayList(); + hmsExternalDatabase.getTables().stream().forEach(table -> names.add(table.getName())); + return names; + } else { + try { + return client.getAllTables(getRealTableName(dbName)); + } catch (MetaException e) { + LOG.warn("List Table Names failed. {}", e.getMessage()); + } } return Lists.newArrayList(); } @@ -161,4 +189,8 @@ public class HMSExternalCatalog extends ExternalCatalog { makeSureInitialized(); return Lists.newArrayList(dbNameToId.values()); } + + public ExternalDatabase getDbForReplay(long dbId) { + return idToDb.get(dbId); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java new file mode 100644 index 0000000000..48deec84e6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +@Data +public class InitCatalogLog implements Writable { + enum Type { + HMS, + ES, + UNKNOWN; + } + + @SerializedName(value = "catalogId") + private long catalogId; + + @SerializedName(value = "refreshCount") + private int refreshCount; + + @SerializedName(value = "createCount") + private int createCount; + + @SerializedName(value = "refreshDbIds") + private List<Long> refreshDbIds; + + @SerializedName(value = "createDbIds") + private List<Long> createDbIds; + + @SerializedName(value = "createDbNames") + private List<String> createDbNames; + + @SerializedName(value = "type") + private Type type; + + public InitCatalogLog() { + refreshCount = 0; + createCount = 0; + catalogId = 0; + refreshDbIds = Lists.newArrayList(); + createDbIds = Lists.newArrayList(); + createDbNames = Lists.newArrayList(); + type = Type.UNKNOWN; + } + + public void addRefreshDb(long id) { + refreshCount += 1; + refreshDbIds.add(id); + } + + public void addCreateDb(long id, String name) { + createCount += 1; + createDbIds.add(id); + createDbNames.add(name); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static InitCatalogLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, InitCatalogLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java new file mode 100644 index 0000000000..eade3d12b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +@Data +public class InitDatabaseLog implements Writable { + public enum Type { + HMS, + ES, + UNKNOWN; + } + + @SerializedName(value = "catalogId") + private long catalogId; + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "refreshCount") + private int refreshCount; + + @SerializedName(value = "createCount") + private int createCount; + + @SerializedName(value = "refreshTableIds") + private List<Long> refreshTableIds; + + @SerializedName(value = "createTableIds") + private List<Long> createTableIds; + + @SerializedName(value = "createTableNames") + private List<String> createTableNames; + + @SerializedName(value = "type") + private Type type; + + public InitDatabaseLog() { + refreshCount = 0; + createCount = 0; + catalogId = 0; + dbId = 0; + refreshTableIds = Lists.newArrayList(); + createTableIds = Lists.newArrayList(); + createTableNames = Lists.newArrayList(); + type = Type.UNKNOWN; + } + + public void addRefreshTable(long id) { + refreshCount += 1; + refreshTableIds.add(id); + } + + public void addCreateTable(long id, String name) { + createCount += 1; + createTableIds.add(id); + createTableNames.add(name); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static InitDatabaseLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, InitDatabaseLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java new file mode 100644 index 0000000000..2f462b551c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +@Data +public class InitTableLog implements Writable { + enum Type { + HMS, + ES, + UNKNOWN; + } + + @SerializedName(value = "catalogId") + private long catalogId; + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "tableId") + private long tableId; + + @SerializedName(value = "type") + private Type type; + + @SerializedName(value = "schema") + protected volatile List<Column> schema; + + public InitTableLog() {} + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static InitTableLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, InitTableLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index 5e9b25ffc5..cbf827b429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -37,6 +37,7 @@ import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -370,6 +371,7 @@ public class EsUtil { column.setName(key); column.setIsKey(true); column.setIsAllowNull(true); + column.setUniqueId((int) Env.getCurrentEnv().getNextId()); if (arrayFields.contains(key)) { column.setType(ArrayType.create(type, true)); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 11e8ca03dd..dc48ed62a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -37,6 +37,10 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.datasource.CatalogLog; +import org.apache.doris.datasource.ExternalObjectLog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.InitTableLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteInfo; @@ -685,6 +689,27 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_INIT_CATALOG: { + data = InitCatalogLog.read(in); + isRead = true; + break; + } + case OperationType.OP_INIT_EXTERNAL_DB: { + data = InitDatabaseLog.read(in); + isRead = true; + break; + } + case OperationType.OP_INIT_EXTERNAL_TABLE: { + data = InitTableLog.read(in); + isRead = true; + break; + } + case OperationType.OP_REFRESH_EXTERNAL_DB: + case OperationType.OP_REFRESH_EXTERNAL_TABLE: { + data = ExternalObjectLog.read(in); + isRead = true; + break; + } case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: { data = TableAddOrDropColumnsInfo.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 09ec9c24ff..39bd5e7dfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -42,6 +42,10 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.datasource.CatalogLog; +import org.apache.doris.datasource.ExternalObjectLog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.InitTableLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.journal.Journal; import org.apache.doris.journal.JournalCursor; @@ -926,6 +930,31 @@ public class EditLog { env.getAuth().replayAlterUser(log); break; } + case OperationType.OP_INIT_CATALOG: { + final InitCatalogLog log = (InitCatalogLog) journal.getData(); + env.getCatalogMgr().replayInitCatalog(log); + break; + } + case OperationType.OP_REFRESH_EXTERNAL_DB: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayRefreshExternalDb(log); + break; + } + case OperationType.OP_INIT_EXTERNAL_DB: { + final InitDatabaseLog log = (InitDatabaseLog) journal.getData(); + env.getCatalogMgr().replayInitExternalDb(log); + break; + } + case OperationType.OP_REFRESH_EXTERNAL_TABLE: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayRefreshExternalTable(log); + break; + } + case OperationType.OP_INIT_EXTERNAL_TABLE: { + final InitTableLog log = (InitTableLog) journal.getData(); + env.getCatalogMgr().replayInitExternalTable(log); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1581,6 +1610,26 @@ public class EditLog { logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds)); } + public void logInitCatalog(InitCatalogLog log) { + logEdit(OperationType.OP_INIT_CATALOG, log); + } + + public void logRefreshExternalDb(ExternalObjectLog log) { + logEdit(OperationType.OP_REFRESH_EXTERNAL_DB, log); + } + + public void logInitExternalDb(InitDatabaseLog log) { + logEdit(OperationType.OP_INIT_EXTERNAL_DB, log); + } + + public void logRefreshExternalTable(ExternalObjectLog log) { + logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log); + } + + public void logInitExternalTable(InitTableLog log) { + logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log); + } + public Journal getJournal() { return this.journal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 3ff1473071..82304acb23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -239,6 +239,11 @@ public class OperationType { public static final short OP_ALTER_CATALOG_NAME = 322; public static final short OP_ALTER_CATALOG_PROPS = 323; public static final short OP_REFRESH_CATALOG = 324; + public static final short OP_INIT_CATALOG = 325; + public static final short OP_REFRESH_EXTERNAL_DB = 326; + public static final short OP_INIT_EXTERNAL_DB = 327; + public static final short OP_REFRESH_EXTERNAL_TABLE = 328; + public static final short OP_INIT_EXTERNAL_TABLE = 329; // scheduler job and task 330-350 public static final short OP_CREATE_MTMV_JOB = 330; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 3bee6d59ff..30fe5fdcc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -21,6 +21,7 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.RollupJobV2; import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.JdbcResource; @@ -32,6 +33,13 @@ import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.SparkResource; import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.EsExternalDatabase; +import org.apache.doris.catalog.external.EsExternalTable; +import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; @@ -140,7 +148,6 @@ public class GsonUtils { = RuntimeTypeAdapterFactory.of(LoadJobStateUpdateInfo.class, "clazz") .registerSubtype(SparkLoadJobStateUpdateInfo.class, SparkLoadJobStateUpdateInfo.class.getSimpleName()); - // runtime adapter for class "Policy" private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory = RuntimeTypeAdapterFactory.of( Policy.class, "clazz").registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName()) @@ -152,6 +159,18 @@ public class GsonUtils { .registerSubtype(HMSExternalCatalog.class, HMSExternalCatalog.class.getSimpleName()) .registerSubtype(EsExternalCatalog.class, EsExternalCatalog.class.getSimpleName()); + private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of( + DatabaseIf.class, "clazz") + .registerSubtype(ExternalDatabase.class, ExternalDatabase.class.getSimpleName()) + .registerSubtype(EsExternalDatabase.class, EsExternalDatabase.class.getSimpleName()) + .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName()); + + private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of( + TableIf.class, "clazz") + .registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName()) + .registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName()) + .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder().addSerializationExclusionStrategy( @@ -165,7 +184,10 @@ public class GsonUtils { .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) .registerTypeAdapterFactory(syncJobTypeAdapterFactory) .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory) - .registerTypeAdapterFactory(policyTypeAdapterFactory).registerTypeAdapterFactory(dsTypeAdapterFactory) + .registerTypeAdapterFactory(policyTypeAdapterFactory) + .registerTypeAdapterFactory(dsTypeAdapterFactory) + .registerTypeAdapterFactory(dbTypeAdapterFactory) + .registerTypeAdapterFactory(tblTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java new file mode 100644 index 0000000000..c3a08d6d50 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TInitExternalCtlMetaRequest; +import org.apache.doris.thrift.TInitExternalCtlMetaResult; +import org.apache.doris.thrift.TNetworkAddress; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * The client for Observer FE to forward external datasource object init request to master. + * Including init ExternalCatalog, ExternalDatabase and ExternalTable. + * This client will wait for the journal ID replayed at this Observer FE before return. + */ +public class MasterCatalogExecutor { + + private static final Logger LOG = LogManager.getLogger(MasterCatalogExecutor.class); + + private final ConnectContext ctx; + private int waitTimeoutMs; + + public MasterCatalogExecutor() { + ctx = ConnectContext.get(); + waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + } + + public void forward(long catalogId, long dbId, long tableId) throws Exception { + if (!ctx.getEnv().isReady()) { + throw new Exception("Current catalog is not ready, please wait for a while."); + } + String masterHost = ctx.getEnv().getMasterIp(); + int masterRpcPort = ctx.getEnv().getMasterRpcPort(); + TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort); + + FrontendService.Client client = null; + try { + client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeoutMs); + } catch (Exception e) { + throw new Exception("Failed to get master client.", e); + } + TInitExternalCtlMetaRequest request = new TInitExternalCtlMetaRequest(); + request.setCatalogId(catalogId); + if (dbId != -1) { + request.setDbId(dbId); + } + if (tableId != -1) { + request.setTableId(tableId); + } + boolean isReturnToPool = false; + try { + TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request); + ConnectContext.get().getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); + isReturnToPool = true; + } catch (Exception e) { + LOG.warn("Failed to finish forward init operation, please try again. ", e); + throw e; + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8f6641ccb2..53ee7dd637 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -43,6 +44,7 @@ import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -77,6 +79,8 @@ import org.apache.doris.thrift.TGetStoragePolicy; import org.apache.doris.thrift.TGetStoragePolicyResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; +import org.apache.doris.thrift.TInitExternalCtlMetaRequest; +import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TLoadTxn2PCRequest; @@ -1061,4 +1065,76 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.debug("refresh storage policy request: {}", result); return result; } + + @Override + public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException { + if (request.isSetCatalogId() && request.isSetDbId() && request.isSetTableId()) { + return initTable(request.catalogId, request.dbId, request.tableId); + } else if (request.isSetCatalogId() && request.isSetDbId()) { + return initDb(request.catalogId, request.dbId); + } else if (request.isSetCatalogId()) { + return initCatalog(request.catalogId); + } else { + throw new TException("Catalog name is not set. Init failed."); + } + } + + private TInitExternalCtlMetaResult initCatalog(long catalogId) throws TException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (!(catalog instanceof ExternalCatalog)) { + throw new TException("Only support forward ExternalCatalog init operation."); + } + ((ExternalCatalog) catalog).makeSureInitialized(); + TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); + result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + result.setStatus("OK"); + return result; + } + + private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (!(catalog instanceof ExternalCatalog)) { + throw new TException("Only support forward ExternalCatalog init operation."); + } + DatabaseIf db = catalog.getDbNullable(dbId); + if (db == null) { + throw new TException("database " + dbId + " is null"); + } + if (!(db instanceof ExternalDatabase)) { + throw new TException("Only support forward ExternalDatabase init operation."); + } + ((ExternalDatabase) db).makeSureInitialized(); + TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); + result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + result.setStatus("OK"); + return result; + } + + private TInitExternalCtlMetaResult initTable(long catalogId, long dbId, long tableId) + throws TException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (!(catalog instanceof ExternalCatalog)) { + throw new TException("Only support forward ExternalCatalog init operation."); + } + DatabaseIf db = catalog.getDbNullable(dbId); + if (db == null) { + throw new TException("database " + dbId + " is null"); + } + if (!(db instanceof ExternalDatabase)) { + throw new TException("Only support forward ExternalDatabase init operation."); + } + TableIf table = db.getTableNullable(tableId); + if (table == null) { + throw new TException("table " + tableId + " is null"); + } + if (!(table instanceof ExternalTable)) { + throw new TException("Only support forward ExternalTable init operation."); + } + + ((ExternalTable) table).makeSureInitialized(); + TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); + result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + result.setStatus("OK"); + return result; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 871870178a..d1564261e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -27,7 +27,13 @@ import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.ShowCatalogStmt; import org.apache.doris.analysis.SwitchStmt; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.external.EsExternalDatabase; +import org.apache.doris.catalog.external.EsExternalTable; +import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -37,6 +43,7 @@ import org.apache.doris.qe.ShowResultSet; import org.apache.doris.system.SystemInfoService; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -95,6 +102,15 @@ public class CatalogMgrTest extends TestWithFeService { rootCtx); env.getCatalogMgr().createCatalog(iceBergCatalog); + // create es catalog + CreateCatalogStmt esCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt( + "create catalog es properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1');", + rootCtx); + env.getCatalogMgr().createCatalog(esCatalog); + + createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("hive")); + createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("es")); + // switch to hive. SwitchStmt switchHive = (SwitchStmt) parseAndAnalyzeStmt("switch hive;", rootCtx); env.changeCatalog(rootCtx, switchHive.getCatalogName()); @@ -109,6 +125,26 @@ public class CatalogMgrTest extends TestWithFeService { user2.analyze(SystemInfoService.DEFAULT_CLUSTER); } + private void createDbAndTableForCatalog(CatalogIf catalog) { + List<Column> schema = Lists.newArrayList(); + schema.add(new Column("k1", PrimitiveType.INT)); + if (catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog; + HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog, 10000, "hive_db1"); + HMSExternalTable tbl = new HMSExternalTable(10001, "hive_tbl1", "hive_db1", hmsCatalog); + tbl.setNewFullSchema(schema); + db.addTableForTest(tbl); + hmsCatalog.addDatabaseForTest(db); + } else if (catalog instanceof ExternalCatalog) { + EsExternalCatalog esCatalog = (EsExternalCatalog) catalog; + EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1"); + EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_tbl1", esCatalog); + tbl.setNewFullSchema(schema); + db.addTableForTest(tbl); + esCatalog.addDatabaseForTest(db); + } + } + @Test public void testNormalCase() throws Exception { String createCatalogSql = "CREATE CATALOG hms_catalog " @@ -119,7 +155,7 @@ public class CatalogMgrTest extends TestWithFeService { String showCatalogSql = "SHOW CATALOGS"; ShowCatalogStmt showStmt = (ShowCatalogStmt) parseAndAnalyzeStmt(showCatalogSql); ShowResultSet showResultSet = mgr.showCatalogs(showStmt); - Assertions.assertEquals(4, showResultSet.getResultRows().size()); + Assertions.assertEquals(5, showResultSet.getResultRows().size()); String alterCatalogNameSql = "ALTER CATALOG hms_catalog RENAME " + MY_CATALOG + ";"; AlterCatalogNameStmt alterNameStmt = (AlterCatalogNameStmt) parseAndAnalyzeStmt(alterCatalogNameSql); @@ -151,7 +187,7 @@ public class CatalogMgrTest extends TestWithFeService { DropCatalogStmt dropCatalogStmt = (DropCatalogStmt) parseAndAnalyzeStmt(dropCatalogSql); mgr.dropCatalog(dropCatalogStmt); showResultSet = mgr.showCatalogs(showStmt); - Assertions.assertEquals(3, showResultSet.getResultRows().size()); + Assertions.assertEquals(4, showResultSet.getResultRows().size()); } private void testCatalogMgrPersist() throws Exception { @@ -173,7 +209,7 @@ public class CatalogMgrTest extends TestWithFeService { DataInputStream dis = new DataInputStream(new FileInputStream(file)); CatalogMgr mgr2 = CatalogMgr.read(dis); - Assert.assertEquals(4, mgr2.listCatalogs().size()); + Assert.assertEquals(5, mgr2.listCatalogs().size()); Assert.assertEquals(myCatalog.getId(), mgr2.getCatalog(MY_CATALOG).getId()); Assert.assertEquals(0, mgr2.getInternalCatalog().getId()); Assert.assertEquals(0, mgr2.getCatalog(InternalCatalog.INTERNAL_DS_ID).getId()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b3c5fe7e29..5d48c1ae7e 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -678,6 +678,17 @@ struct TWaitingTxnStatusResult { 2: optional i32 txn_status_id } +struct TInitExternalCtlMetaRequest { + 1: optional i64 catalogId + 2: optional i64 dbId + 3: optional i64 tableId +} + +struct TInitExternalCtlMetaResult { + 1: optional i64 maxJournalId; + 2: optional string status; +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -713,4 +724,5 @@ service FrontendService { TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) AgentService.TGetStoragePolicyResult refreshStoragePolicy() + TInitExternalCtlMetaResult initExternalCtlMeta(1: TInitExternalCtlMetaRequest request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org