This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d84d81e6 [feature-wip][refactor](multi-catalog) Persist external 
catalog related metadata. (#13746)
f2d84d81e6 is described below

commit f2d84d81e651d3de8e4c67a0adbf927a7ca04268
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Fri Nov 4 09:04:00 2022 +0800

    [feature-wip][refactor](multi-catalog) Persist external catalog related 
metadata. (#13746)
    
    Persist external catalog/db/table, including the columns of external tables.
    After this change, external objects could have their own uniq ID through 
their lifetime,
    this is required for the statistic information collection.
---
 .../main/java/org/apache/doris/catalog/Column.java |   5 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +-
 .../org/apache/doris/catalog/RefreshManager.java   |  10 ++
 .../doris/catalog/external/EsExternalDatabase.java |  79 +++++++++++-
 .../doris/catalog/external/EsExternalTable.java    |  79 ++++++++----
 .../doris/catalog/external/ExternalDatabase.java   |  63 +++++++++-
 .../doris/catalog/external/ExternalTable.java      |  81 ++++++++++---
 .../catalog/external/HMSExternalDatabase.java      |  81 ++++++++++++-
 .../doris/catalog/external/HMSExternalTable.java   | 133 +++++++++++++--------
 .../org/apache/doris/datasource/CatalogMgr.java    |  50 ++++++++
 .../apache/doris/datasource/EsExternalCatalog.java |  49 ++++++--
 .../apache/doris/datasource/ExternalCatalog.java   |  78 +++++++++++-
 .../apache/doris/datasource/ExternalObjectLog.java |  56 +++++++++
 .../doris/datasource/HMSExternalCatalog.java       |  72 +++++++----
 .../apache/doris/datasource/InitCatalogLog.java    |  92 ++++++++++++++
 .../apache/doris/datasource/InitDatabaseLog.java   |  96 +++++++++++++++
 .../org/apache/doris/datasource/InitTableLog.java  |  67 +++++++++++
 .../doris/external/elasticsearch/EsUtil.java       |   2 +
 .../org/apache/doris/journal/JournalEntity.java    |  25 ++++
 .../java/org/apache/doris/persist/EditLog.java     |  49 ++++++++
 .../org/apache/doris/persist/OperationType.java    |   5 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |  26 +++-
 .../org/apache/doris/qe/MasterCatalogExecutor.java |  84 +++++++++++++
 .../apache/doris/service/FrontendServiceImpl.java  |  76 ++++++++++++
 .../apache/doris/datasource/CatalogMgrTest.java    |  42 ++++++-
 gensrc/thrift/FrontendService.thrift               |  12 ++
 26 files changed, 1273 insertions(+), 141 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index a3d529d718..532a49b0eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -597,10 +597,9 @@ public class Column implements Writable {
                 && getStrLen() == other.getStrLen()
                 && getPrecision() == other.getPrecision()
                 && getScale() == other.getScale()
-                && comment.equals(other.comment)
+                && Objects.equals(comment, other.comment)
                 && visible == other.visible
-                && children.size() == other.children.size()
-                && children.equals(other.children);
+                && Objects.equals(children, other.children);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e665064a35..05bfc80efa 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1937,7 +1937,7 @@ public class Env {
      **/
     public long loadCatalog(DataInputStream in, long checksum) throws 
IOException {
         CatalogMgr mgr = CatalogMgr.read(in);
-        // When enable the multi catalog in the first time, the mgr will be a 
null value.
+        // When enable the multi catalog in the first time, the "mgr" will be 
a null value.
         // So ignore it to use default catalog manager.
         if (mgr != null) {
             this.catalogMgr = mgr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 31cbc266f5..bb37a9b1b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InternalCatalog;
 
 import org.apache.logging.log4j.LogManager;
@@ -117,6 +118,10 @@ public class RefreshManager {
             throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
         }
         ((ExternalDatabase) db).setUnInitialized();
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log);
     }
 
     private void refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env 
env) throws UserException {
@@ -156,5 +161,10 @@ public class RefreshManager {
             throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
         }
         ((ExternalTable) table).setUnInitialized();
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
index 8898ede84d..f9ba015826 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -20,27 +20,34 @@ package org.apache.doris.catalog.external;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Elasticsearch metastore external database.
  */
-public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> {
+public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> 
implements GsonPostProcessable {
 
     private static final Logger LOG = 
LogManager.getLogger(EsExternalDatabase.class);
 
     // Cache of table name to table id.
     private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
-    private Map<Long, EsExternalTable> idToTbl = Maps.newHashMap();
+    @SerializedName(value = "idToTbl")
+    private Map<Long, EsExternalTable> idToTbl = Maps.newConcurrentMap();
 
     /**
      * Create Elasticsearch external database.
@@ -53,14 +60,53 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> {
         super(extCatalog, id, name);
     }
 
-    private synchronized void makeSureInitialized() {
+    public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+        Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+        Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+        for (int i = 0; i < log.getRefreshCount(); i++) {
+            EsExternalTable table = 
getTableForReplay(log.getRefreshTableIds().get(i));
+            table.setUnInitialized();
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        for (int i = 0; i < log.getCreateCount(); i++) {
+            EsExternalTable table = new 
EsExternalTable(log.getCreateTableIds().get(i),
+                    log.getCreateTableNames().get(i), name, 
(EsExternalCatalog) catalog);
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        tableNameToId = tmpTableNameToId;
+        idToTbl = tmpIdToTbl;
+        initialized = true;
+    }
+
+    public void setTableExtCatalog(ExternalCatalog extCatalog) {
+        for (EsExternalTable table : idToTbl.values()) {
+            table.setCatalog(extCatalog);
+        }
+    }
+
+    public synchronized void makeSureInitialized() {
         if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(extCatalog.getId(), id, -1);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init db {} operation to 
master. {}", name, e.getMessage());
+                }
+                return;
+            }
             init();
-            initialized = true;
         }
     }
 
     private void init() {
+        InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+        initDatabaseLog.setType(InitDatabaseLog.Type.ES);
+        initDatabaseLog.setCatalogId(extCatalog.getId());
+        initDatabaseLog.setDbId(id);
         List<String> tableNames = extCatalog.listTableNames(null, name);
         if (tableNames != null) {
             Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
@@ -73,15 +119,20 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> {
                     EsExternalTable table = idToTbl.get(tblId);
                     table.setUnInitialized();
                     tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addRefreshTable(tblId);
                 } else {
                     tblId = Env.getCurrentEnv().getNextId();
                     tmpTableNameToId.put(tableName, tblId);
-                    tmpIdToTbl.put(tblId, new EsExternalTable(tblId, 
tableName, name, (EsExternalCatalog) extCatalog));
+                    EsExternalTable table = new EsExternalTable(tblId, 
tableName, name, (EsExternalCatalog) extCatalog);
+                    tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addCreateTable(tblId, tableName);
                 }
             }
             tableNameToId = tmpTableNameToId;
             idToTbl = tmpIdToTbl;
         }
+        initialized = true;
+        Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
     }
 
     @Override
@@ -110,4 +161,22 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> {
         makeSureInitialized();
         return idToTbl.get(tableId);
     }
+
+    public EsExternalTable getTableForReplay(long tableId) {
+        return idToTbl.get(tableId);
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        tableNameToId = Maps.newConcurrentMap();
+        for (EsExternalTable tbl : idToTbl.values()) {
+            tableNameToId.put(tbl.getName(), tbl.getId());
+        }
+        rwLock = new ReentrantReadWriteLock(true);
+    }
+
+    public void addTableForTest(EsExternalTable tbl) {
+        idToTbl.put(tbl.getId(), tbl);
+        tableNameToId.put(tbl.getName(), tbl.getId());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
index ea44552896..15d1384744 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -18,9 +18,12 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.datasource.EsExternalCatalog;
+import org.apache.doris.datasource.InitTableLog;
 import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -36,9 +39,6 @@ import java.util.List;
 public class EsExternalTable extends ExternalTable {
 
     private static final Logger LOG = 
LogManager.getLogger(EsExternalTable.class);
-
-    private final EsExternalCatalog catalog;
-    private final String dbName;
     private EsTable esTable;
 
     /**
@@ -50,23 +50,57 @@ public class EsExternalTable extends ExternalTable {
      * @param catalog HMSExternalDataSource.
      */
     public EsExternalTable(long id, String name, String dbName, 
EsExternalCatalog catalog) {
-        super(id, name);
-        this.dbName = dbName;
-        this.catalog = catalog;
-        this.type = TableType.ES_EXTERNAL_TABLE;
+        super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE);
     }
 
 
-    private synchronized void makeSureInitialized() {
+    public synchronized void makeSureInitialized() {
         if (!initialized) {
-            init();
-            initialized = true;
+            if (!Env.getCurrentEnv().isMaster()) {
+                fullSchema = null;
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(catalog.getId(), 
catalog.getDbNullable(dbName).getId(), id);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init table {} operation to 
master. {}", name, e.getMessage());
+                }
+            } else {
+                init();
+            }
+        }
+        if (!objectCreated) {
+            esTable = toEsTable();
+            objectCreated = true;
         }
     }
 
     private void init() {
-        fullSchema = EsUtil.genColumnsFromEs(catalog.getEsRestClient(), name, 
null);
-        esTable = toEsTable();
+        boolean schemaChanged = false;
+        List<Column> tmpSchema = EsUtil.genColumnsFromEs(
+                ((EsExternalCatalog) catalog).getEsRestClient(), name, null);
+        if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
+            schemaChanged = true;
+        } else {
+            for (int i = 0; i < fullSchema.size(); i++) {
+                if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
+                    schemaChanged = true;
+                    break;
+                }
+            }
+        }
+        if (schemaChanged) {
+            timestamp = System.currentTimeMillis();
+            fullSchema = tmpSchema;
+            esTable = toEsTable();
+        }
+        initialized = true;
+        InitTableLog initTableLog = new InitTableLog();
+        initTableLog.setCatalogId(catalog.getId());
+        initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
+        initTableLog.setTableId(id);
+        initTableLog.setSchema(fullSchema);
+        Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
     }
 
     @Override
@@ -107,7 +141,7 @@ public class EsExternalTable extends ExternalTable {
     }
 
     /**
-     * get database name of hms table.
+     * get database name of es table.
      */
     public String getDbName() {
         return dbName;
@@ -123,17 +157,18 @@ public class EsExternalTable extends ExternalTable {
     }
 
     private EsTable toEsTable() {
+        EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
         EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, 
TableType.ES_EXTERNAL_TABLE);
         esTable.setIndexName(name);
-        esTable.setClient(catalog.getEsRestClient());
-        esTable.setUserName(catalog.getUsername());
-        esTable.setPasswd(catalog.getPassword());
-        esTable.setEnableDocValueScan(catalog.isEnableDocValueScan());
-        esTable.setEnableKeywordSniff(catalog.isEnableKeywordSniff());
-        esTable.setNodesDiscovery(catalog.isEnableNodesDiscovery());
-        esTable.setHttpSslEnabled(catalog.isEnableSsl());
-        esTable.setSeeds(catalog.getNodes());
-        esTable.setHosts(String.join(",", catalog.getNodes()));
+        esTable.setClient(esCatalog.getEsRestClient());
+        esTable.setUserName(esCatalog.getUsername());
+        esTable.setPasswd(esCatalog.getPassword());
+        esTable.setEnableDocValueScan(esCatalog.isEnableDocValueScan());
+        esTable.setEnableKeywordSniff(esCatalog.isEnableKeywordSniff());
+        esTable.setNodesDiscovery(esCatalog.isEnableNodesDiscovery());
+        esTable.setHttpSslEnabled(esCatalog.isEnableSsl());
+        esTable.setSeeds(esCatalog.getNodes());
+        esTable.setHosts(String.join(",", esCatalog.getNodes()));
         esTable.syncTableMetaData();
         return esTable;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index d76ed380a7..c5fa7b9875 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -21,13 +21,22 @@ import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.DatabaseProperty;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -38,17 +47,28 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  *
  * @param <T> External table type is ExternalTable or its subclass.
  */
-public class ExternalDatabase<T extends ExternalTable> implements 
DatabaseIf<T> {
+public class ExternalDatabase<T extends ExternalTable> implements 
DatabaseIf<T>, Writable, GsonPostProcessable {
 
     private static final Logger LOG = 
LogManager.getLogger(ExternalDatabase.class);
 
-    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+    protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
+    @SerializedName(value = "id")
     protected long id;
+    @SerializedName(value = "name")
     protected String name;
-    protected ExternalCatalog extCatalog;
-    protected DatabaseProperty dbProperties;
+    @SerializedName(value = "dbProperties")
+    protected DatabaseProperty dbProperties = new DatabaseProperty();
+    @SerializedName(value = "initialized")
     protected boolean initialized = false;
+    protected ExternalCatalog extCatalog;
+
+    /**
+     * No args constructor for persist.
+     */
+    public ExternalDatabase() {
+        initialized = false;
+    }
 
     /**
      * Create external database.
@@ -63,10 +83,30 @@ public class ExternalDatabase<T extends ExternalTable> 
implements DatabaseIf<T>
         this.name = name;
     }
 
-    public synchronized void setUnInitialized() {
+    public void setExtCatalog(ExternalCatalog extCatalog) {
+        this.extCatalog = extCatalog;
+    }
+
+    public void setTableExtCatalog(ExternalCatalog extCatalog) {}
+
+    public void setUnInitialized() {
         this.initialized = false;
     }
 
+    public boolean isInitialized() {
+        return initialized;
+    }
+
+    public void makeSureInitialized() {}
+
+    public T getTableForReplay(long tableId) {
+        throw new NotImplementedException();
+    }
+
+    public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+        throw new NotImplementedException();
+    }
+
     @Override
     public void readLock() {
         this.rwLock.readLock().lock();
@@ -177,4 +217,17 @@ public class ExternalDatabase<T extends ExternalTable> 
implements DatabaseIf<T>
     public T getTableNullable(long tableId) {
         throw new NotImplementedException();
     }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static ExternalDatabase read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, ExternalDatabase.class);
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {}
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 33acbc0ec7..f713de7419 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -22,12 +22,22 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TTableDescriptor;
 
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -36,26 +46,36 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * External table represent tables that are not self-managed by Doris.
  * Such as tables from hive, iceberg, es, etc.
  */
-public class ExternalTable implements TableIf {
-
+@Getter
+public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(ExternalTable.class);
 
+    @SerializedName(value = "id")
     protected long id;
+    @SerializedName(value = "name")
     protected String name;
-    protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+    @SerializedName(value = "type")
     protected TableType type = null;
+    @SerializedName(value = "fullSchema")
     protected volatile List<Column> fullSchema = null;
+    @SerializedName(value = "initialized")
     protected boolean initialized = false;
+    @SerializedName(value = "timestamp")
+    protected long timestamp;
+
+    protected ExternalCatalog catalog;
+    @SerializedName(value = "dbName")
+    protected String dbName;
+    protected boolean objectCreated = false;
+    protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Create external table.
-     *
-     * @param id Table id.
-     * @param name Table name.
+     * No args constructor for persist.
      */
-    public ExternalTable(long id, String name) {
-        this.id = id;
-        this.name = name;
+    public ExternalTable() {
+        this.initialized = false;
+        this.objectCreated = false;
+        this.fullSchema = null;
     }
 
     /**
@@ -63,23 +83,40 @@ public class ExternalTable implements TableIf {
      *
      * @param id Table id.
      * @param name Table name.
+     * @param catalog ExternalCatalog this table belongs to.
+     * @param dbName Name of the db the this table belongs to.
      * @param type Table type.
      */
-    public ExternalTable(long id, String name, TableType type) {
+    public ExternalTable(long id, String name, ExternalCatalog catalog, String 
dbName, TableType type) {
         this.id = id;
         this.name = name;
+        this.catalog = catalog;
+        this.dbName = dbName;
         this.type = type;
+        this.initialized = false;
+        this.objectCreated = false;
+        this.fullSchema = null;
+    }
+
+    public void setCatalog(ExternalCatalog catalog) {
+        this.catalog = catalog;
     }
 
     public boolean isView() {
         return false;
     }
 
-    public synchronized void setUnInitialized() {
+    public void setUnInitialized() {
         this.initialized = false;
-        this.fullSchema = null;
     }
 
+    public void replayInitTable(List<Column> schema) {
+        fullSchema = schema;
+        initialized = true;
+    }
+
+    public void makeSureInitialized() {}
+
     @Override
     public void readLock() {
         this.rwLock.readLock().lock();
@@ -260,10 +297,26 @@ public class ExternalTable implements TableIf {
     @Override
     public String getComment(boolean escapeQuota) {
         return "";
-
     }
 
     public TTableDescriptor toThrift() {
         return null;
     }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static ExternalTable read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, ExternalTable.class);
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        rwLock = new ReentrantReadWriteLock(true);
+        objectCreated = false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index b8d9287982..dc79a679f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -21,28 +21,35 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
  * Hive metastore external database.
  */
-public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
+public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> 
implements GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalDatabase.class);
 
     // Cache of table name to table id.
     private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
-    private Map<Long, HMSExternalTable> idToTbl = Maps.newHashMap();
+    @SerializedName(value = "idToTbl")
+    private Map<Long, HMSExternalTable> idToTbl = Maps.newConcurrentMap();
 
     /**
      * Create HMS external database.
@@ -55,14 +62,53 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> {
         super(extCatalog, id, name);
     }
 
-    private synchronized void makeSureInitialized() {
+    public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+        Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+        Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+        for (int i = 0; i < log.getRefreshCount(); i++) {
+            HMSExternalTable table = 
getTableForReplay(log.getRefreshTableIds().get(i));
+            table.setUnInitialized();
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        for (int i = 0; i < log.getCreateCount(); i++) {
+            HMSExternalTable table = new 
HMSExternalTable(log.getCreateTableIds().get(i),
+                    log.getCreateTableNames().get(i), name, 
(HMSExternalCatalog) catalog);
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        tableNameToId = tmpTableNameToId;
+        idToTbl = tmpIdToTbl;
+        initialized = true;
+    }
+
+    public void setTableExtCatalog(ExternalCatalog extCatalog) {
+        for (HMSExternalTable table : idToTbl.values()) {
+            table.setCatalog(extCatalog);
+        }
+    }
+
+    public synchronized void makeSureInitialized() {
         if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(extCatalog.getId(), id, -1);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init db {} operation to 
master. {}", name, e.getMessage());
+                }
+                return;
+            }
             init();
-            initialized = true;
         }
     }
 
     private void init() {
+        InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+        initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
+        initDatabaseLog.setCatalogId(extCatalog.getId());
+        initDatabaseLog.setDbId(id);
         List<String> tableNames = extCatalog.listTableNames(null, name);
         if (tableNames != null) {
             Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
@@ -75,16 +121,21 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> {
                     HMSExternalTable table = idToTbl.get(tblId);
                     table.setUnInitialized();
                     tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addRefreshTable(tblId);
                 } else {
                     tblId = Env.getCurrentEnv().getNextId();
                     tmpTableNameToId.put(tableName, tblId);
-                    tmpIdToTbl.put(tblId,
-                        new HMSExternalTable(tblId, tableName, name, 
(HMSExternalCatalog) extCatalog));
+                    HMSExternalTable table = new HMSExternalTable(tblId, 
tableName, name,
+                            (HMSExternalCatalog) extCatalog);
+                    tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addCreateTable(tblId, tableName);
                 }
             }
             tableNameToId = tmpTableNameToId;
             idToTbl = tmpIdToTbl;
         }
+        initialized = true;
+        Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
     }
 
     @Override
@@ -119,4 +170,22 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> {
         makeSureInitialized();
         return idToTbl.get(tableId);
     }
+
+    public HMSExternalTable getTableForReplay(long tableId) {
+        return idToTbl.get(tableId);
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        tableNameToId = Maps.newConcurrentMap();
+        for (HMSExternalTable tbl : idToTbl.values()) {
+            tableNameToId.put(tbl.getName(), tbl.getId());
+        }
+        rwLock = new ReentrantReadWriteLock(true);
+    }
+
+    public void addTableForTest(HMSExternalTable tbl) {
+        idToTbl.put(tbl.getId(), tbl);
+        tableNameToId.put(tbl.getName(), tbl.getId());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index cbe84744ea..9d27cf0fb6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,10 +18,13 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.InitTableLog;
+import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -39,12 +42,9 @@ import java.util.Map;
  * Hive metastore external table.
  */
 public class HMSExternalTable extends ExternalTable {
-
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalTable.class);
 
-    private final HMSExternalCatalog catalog;
-    private final String dbName;
-    private final List<String> supportedHiveFileFormats = Lists.newArrayList(
+    private List<String> supportedHiveFileFormats = Lists.newArrayList(
             "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
             "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
             "org.apache.hadoop.mapred.TextInputFormat");
@@ -65,10 +65,7 @@ public class HMSExternalTable extends ExternalTable {
      * @param catalog HMSExternalCatalog.
      */
     public HMSExternalTable(long id, String name, String dbName, 
HMSExternalCatalog catalog) {
-        super(id, name);
-        this.dbName = dbName;
-        this.catalog = catalog;
-        this.type = TableType.HMS_EXTERNAL_TABLE;
+        super(id, name, catalog, dbName, TableType.HMS_EXTERNAL_TABLE);
     }
 
     public boolean isSupportedHmsTable() {
@@ -76,34 +73,45 @@ public class HMSExternalTable extends ExternalTable {
         return dlaType != DLAType.UNKNOWN;
     }
 
-    private synchronized void makeSureInitialized() {
-        if (!initialized) {
-            init();
-            initialized = true;
-        }
-    }
-
-    private void init() {
-        try {
-            getRemoteTable();
-        } catch (MetaNotFoundException e) {
-            // CHECKSTYLE IGNORE THIS LINE
-        }
-        if (remoteTable == null) {
-            dlaType = DLAType.UNKNOWN;
-            fullSchema = Lists.newArrayList();
-        } else {
-            if (supportedIcebergTable()) {
-                dlaType = DLAType.ICEBERG;
-            } else if (supportedHoodieTable()) {
-                dlaType = DLAType.HUDI;
-            } else if (supportedHiveTable()) {
-                dlaType = DLAType.HIVE;
-            } else {
+    public synchronized void makeSureInitialized() {
+        if (!objectCreated) {
+            try {
+                getRemoteTable();
+            } catch (MetaNotFoundException e) {
+                // CHECKSTYLE IGNORE THIS LINE
+            }
+            supportedHiveFileFormats = Lists.newArrayList(
+                
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+                "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+                "org.apache.hadoop.mapred.TextInputFormat");
+            if (remoteTable == null) {
                 dlaType = DLAType.UNKNOWN;
-                fullSchema = Lists.newArrayList();
+            } else {
+                if (supportedIcebergTable()) {
+                    dlaType = DLAType.ICEBERG;
+                } else if (supportedHoodieTable()) {
+                    dlaType = DLAType.HUDI;
+                } else if (supportedHiveTable()) {
+                    dlaType = DLAType.HIVE;
+                } else {
+                    dlaType = DLAType.UNKNOWN;
+                }
             }
-            initSchema();
+            objectCreated = true;
+        }
+        if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                fullSchema = null;
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(catalog.getId(), 
catalog.getDbNullable(dbName).getId(), id);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init table {} operation to 
master. {}", name, e.getMessage());
+                }
+                return;
+            }
+            init();
         }
     }
 
@@ -150,24 +158,45 @@ public class HMSExternalTable extends ExternalTable {
         return isManagedTable && supportedFileFormat;
     }
 
-    private void initSchema() {
-        if (fullSchema == null) {
-            synchronized (this) {
-                if (fullSchema == null) {
-                    fullSchema = Lists.newArrayList();
-                    try {
-                        for (FieldSchema field : 
HiveMetaStoreClientHelper.getSchema(dbName, name,
-                                catalog.getHiveMetastoreUris())) {
-                            fullSchema.add(new Column(field.getName(),
-                                    
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, 
true,
-                                    null, field.getComment()));
-                        }
-                    } catch (DdlException e) {
-                        LOG.warn("Fail to get schema of hms table {}", name, 
e);
+    private void init() {
+        boolean schemaChanged = false;
+        List<Column> tmpSchema = Lists.newArrayList();
+        if (dlaType.equals(DLAType.UNKNOWN)) {
+            schemaChanged = true;
+        } else {
+            try {
+                for (FieldSchema field : 
HiveMetaStoreClientHelper.getSchema(dbName, name,
+                        ((HMSExternalCatalog) 
catalog).getHiveMetastoreUris())) {
+                    int columnId = (int) Env.getCurrentEnv().getNextId();
+                    tmpSchema.add(new Column(field.getName(),
+                            
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+                            true, null, field.getComment(), true, null, 
columnId));
+                }
+            } catch (DdlException e) {
+                LOG.warn("Fail to get schema of hms table {}", name, e);
+            }
+            if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
+                schemaChanged = true;
+            } else {
+                for (int i = 0; i < fullSchema.size(); i++) {
+                    if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
+                        schemaChanged = true;
+                        break;
                     }
                 }
             }
         }
+        if (schemaChanged) {
+            timestamp = System.currentTimeMillis();
+            fullSchema = tmpSchema;
+        }
+        initialized = true;
+        InitTableLog initTableLog = new InitTableLog();
+        initTableLog.setCatalogId(catalog.getId());
+        initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
+        initTableLog.setTableId(id);
+        initTableLog.setSchema(fullSchema);
+        Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
     }
 
     /**
@@ -177,11 +206,11 @@ public class HMSExternalTable extends ExternalTable {
         if (remoteTable == null) {
             synchronized (this) {
                 if (remoteTable == null) {
+                    String uri = ((HMSExternalCatalog) 
catalog).getHiveMetastoreUris();
                     try {
-                        remoteTable = 
HiveMetaStoreClientHelper.getTable(dbName, name, 
catalog.getHiveMetastoreUris());
+                        remoteTable = 
HiveMetaStoreClientHelper.getTable(dbName, name, uri);
                     } catch (DdlException e) {
-                        LOG.warn("Fail to get remote hive table. db {}, table 
{}, uri {}", dbName, name,
-                                catalog.getHiveMetastoreUris());
+                        LOG.warn("Fail to get remote hive table. db {}, table 
{}, uri {}", dbName, name, uri);
                         throw new MetaNotFoundException(e);
                     }
                 }
@@ -300,7 +329,7 @@ public class HMSExternalTable extends ExternalTable {
     }
 
     public String getMetastoreUri() {
-        return catalog.getHiveMetastoreUris();
+        return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
     }
 
     public Map<String, String> getDfsProperties() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 5fd6079d5c..b92a515751 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -25,6 +25,8 @@ import org.apache.doris.analysis.RefreshCatalogStmt;
 import org.apache.doris.analysis.ShowCatalogStmt;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -39,6 +41,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
@@ -406,6 +409,53 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         }
     }
 
+    public void replayInitCatalog(InitCatalogLog log) {
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        Preconditions.checkArgument(catalog != null);
+        catalog.replayInitCatalog(log);
+    }
+
+    public void replayInitExternalDb(InitDatabaseLog log) {
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        Preconditions.checkArgument(catalog != null);
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        Preconditions.checkArgument(db != null);
+        db.replayInitDb(log, catalog);
+    }
+
+    public void replayInitExternalTable(InitTableLog log) {
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        Preconditions.checkArgument(catalog != null);
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        Preconditions.checkArgument(db != null);
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        Preconditions.checkArgument(table != null);
+        table.replayInitTable(log.getSchema());
+    }
+
+    public void replayRefreshExternalDb(ExternalObjectLog log) {
+        writeLock();
+        try {
+            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+            ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+            db.setUnInitialized();
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void replayRefreshExternalTable(ExternalObjectLog log) {
+        writeLock();
+        try {
+            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+            ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+            ExternalTable table = db.getTableForReplay(log.getTableId());
+            table.setUnInitialized();
+        } finally {
+            writeUnlock();
+        }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index baa2c6e169..180d5730bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -25,6 +25,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.external.elasticsearch.EsRestClient;
 import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -54,10 +55,6 @@ public class EsExternalCatalog extends ExternalCatalog {
     private static final String PROP_NODES_DISCOVERY = 
"elasticsearch.nodes_discovery";
     private static final String PROP_SSL = "elasticsearch.ssl";
 
-    // Cache of db name to db id.
-    private Map<String, Long> dbNameToId;
-    private Map<Long, EsExternalDatabase> idToDb;
-
     private EsRestClient esRestClient;
 
     private String[] nodes;
@@ -138,29 +135,56 @@ public class EsExternalCatalog extends ExternalCatalog {
      * Datasource can't be init when creating because the external datasource 
may depend on third system.
      * So you have to make sure the client of third system is initialized 
before any method was called.
      */
-    private synchronized void makeSureInitialized() {
+    @Override
+    public synchronized void makeSureInitialized() {
+        if (!objectCreated) {
+            try {
+                validate(catalogProperty.getProperties());
+            } catch (DdlException e) {
+                LOG.warn("validate error", e);
+            }
+            esRestClient = new EsRestClient(this.nodes, this.username, 
this.password, this.enableSsl);
+            objectCreated = true;
+        }
         if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(id, -1, -1);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init catalog {} operation to 
master. {}", name, e.getMessage());
+                }
+                return;
+            }
             init();
-            initialized = true;
         }
     }
 
     private void init() {
+        InitCatalogLog initCatalogLog = new InitCatalogLog();
         try {
             validate(this.catalogProperty.getProperties());
         } catch (DdlException e) {
             LOG.warn("validate error", e);
         }
         this.esRestClient = new EsRestClient(this.nodes, this.username, 
this.password, this.enableSsl);
+        initCatalogLog.setCatalogId(id);
+        initCatalogLog.setType(InitCatalogLog.Type.ES);
         if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) {
             idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized();
+            initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB));
         } else {
             dbNameToId = Maps.newConcurrentMap();
             idToDb = Maps.newConcurrentMap();
             long defaultDbId = Env.getCurrentEnv().getNextId();
             dbNameToId.put(DEFAULT_DB, defaultDbId);
-            idToDb.put(defaultDbId, new EsExternalDatabase(this, defaultDbId, 
DEFAULT_DB));
+            EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId, 
DEFAULT_DB);
+            idToDb.put(defaultDbId, db);
+            initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB);
         }
+        initialized = true;
+        Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
     }
 
     @Override
@@ -185,6 +209,13 @@ public class EsExternalCatalog extends ExternalCatalog {
         return idToDb.get(dbNameToId.get(realDbName));
     }
 
+    @Nullable
+    @Override
+    public ExternalDatabase getDbNullable(long dbId) {
+        makeSureInitialized();
+        return idToDb.get(dbId);
+    }
+
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
@@ -194,4 +225,8 @@ public class EsExternalCatalog extends ExternalCatalog {
     public List<Long> getDbIds() {
         return Lists.newArrayList(dbNameToId.values());
     }
+
+    public ExternalDatabase getDbForReplay(long dbId) {
+        return idToDb.get(dbId);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a89e53825e..e6e9609ab3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,15 +17,21 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.DataInput;
@@ -38,7 +44,9 @@ import java.util.Map;
  * The abstract class for all types of external catalogs.
  */
 @Data
-public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, 
Writable {
+public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, 
Writable, GsonPostProcessable {
+    private static final Logger LOG = 
LogManager.getLogger(ExternalCatalog.class);
+
     // Unique id of this catalog, will be assigned after catalog is loaded.
     @SerializedName(value = "id")
     protected long id;
@@ -49,8 +57,16 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
     // save properties of this catalog, such as hive meta store url.
     @SerializedName(value = "catalogProperty")
     protected CatalogProperty catalogProperty = new CatalogProperty();
+    @SerializedName(value = "initialized")
     protected boolean initialized = false;
 
+    // Cache of db name to db id
+    @SerializedName(value = "idToDb")
+    protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
+    // db name does not contains "default_cluster"
+    protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
+    protected boolean objectCreated = false;
+
     /**
      * @return names of database in this catalog.
      */
@@ -71,6 +87,16 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
      */
     public abstract boolean tableExist(SessionContext ctx, String dbName, 
String tblName);
 
+    public abstract void makeSureInitialized();
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
+    public ExternalDatabase getDbForReplay(long dbId) {
+        throw new NotImplementedException();
+    }
+
     @Override
     public long getId() {
         return id;
@@ -123,6 +149,40 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
         Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
 
+    public void replayInitCatalog(InitCatalogLog log) {
+        Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
+        Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+        for (int i = 0; i < log.getRefreshCount(); i++) {
+            ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i));
+            db.setUnInitialized();
+            tmpDbNameToId.put(db.getFullName(), db.getId());
+            tmpIdToDb.put(db.getId(), db);
+        }
+        switch (log.getType()) {
+            case HMS:
+                for (int i = 0; i < log.getCreateCount(); i++) {
+                    HMSExternalDatabase db = new HMSExternalDatabase(
+                            this, log.getCreateDbIds().get(i), 
log.getCreateDbNames().get(i));
+                    tmpDbNameToId.put(db.getFullName(), db.getId());
+                    tmpIdToDb.put(db.getId(), db);
+                }
+                break;
+            case ES:
+                for (int i = 0; i < log.getCreateCount(); i++) {
+                    EsExternalDatabase db = new EsExternalDatabase(
+                            this, log.getCreateDbIds().get(i), 
log.getCreateDbNames().get(i));
+                    tmpDbNameToId.put(db.getFullName(), db.getId());
+                    tmpIdToDb.put(db.getId(), db);
+                }
+                break;
+            default:
+                break;
+        }
+        dbNameToId = tmpDbNameToId;
+        idToDb = tmpIdToDb;
+        initialized = true;
+    }
+
     /**
      * External catalog has no cluster semantics.
      */
@@ -134,4 +194,20 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ExternalCatalog.class);
     }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        dbNameToId = Maps.newConcurrentMap();
+        for (ExternalDatabase db : idToDb.values()) {
+            
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), 
db.getId());
+            db.setExtCatalog(this);
+            db.setTableExtCatalog(this);
+        }
+        objectCreated = false;
+    }
+
+    public void addDatabaseForTest(ExternalDatabase db) {
+        idToDb.put(db.getId(), db);
+        dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), 
db.getId());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
new file mode 100644
index 0000000000..cff446657e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+@NoArgsConstructor
+@Getter
+@Data
+public class ExternalObjectLog implements Writable {
+    @SerializedName(value = "catalogId")
+    private long catalogId;
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+
+    @SerializedName(value = "tableId")
+    private long tableId;
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static ExternalObjectLog read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, ExternalObjectLog.class);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 32847844dc..6154ddd4af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -32,6 +33,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -41,9 +43,6 @@ import java.util.Map;
 public class HMSExternalCatalog extends ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalCatalog.class);
 
-    // Cache of db name to db id.
-    private Map<String, Long> dbNameToId = Maps.newConcurrentMap();
-    private Map<Long, HMSExternalDatabase> idToDb = Maps.newConcurrentMap();
     protected HiveMetaStoreClient client;
 
     /**
@@ -63,15 +62,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     private void init() {
         Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
-        Map<Long, HMSExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
-        HiveConf hiveConf = new HiveConf();
-        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
getHiveMetastoreUris());
-        try {
-            client = new HiveMetaStoreClient(hiveConf);
-        } catch (MetaException e) {
-            LOG.warn("Failed to create HiveMetaStoreClient: {}", 
e.getMessage());
-            return;
-        }
+        Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+        InitCatalogLog initCatalogLog = new InitCatalogLog();
+        initCatalogLog.setCatalogId(id);
+        initCatalogLog.setType(InitCatalogLog.Type.HMS);
         List<String> allDatabases;
         try {
             allDatabases = client.getAllDatabases();
@@ -88,27 +82,53 @@ public class HMSExternalCatalog extends ExternalCatalog {
             if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
                 dbId = dbNameToId.get(dbName);
                 tmpDbNameToId.put(dbName, dbId);
-                HMSExternalDatabase db = idToDb.get(dbId);
+                ExternalDatabase db = idToDb.get(dbId);
                 db.setUnInitialized();
                 tmpIdToDb.put(dbId, db);
+                initCatalogLog.addRefreshDb(dbId);
             } else {
                 dbId = Env.getCurrentEnv().getNextId();
                 tmpDbNameToId.put(dbName, dbId);
-                tmpIdToDb.put(dbId, new HMSExternalDatabase(this, dbId, 
dbName));
+                HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, 
dbName);
+                tmpIdToDb.put(dbId, db);
+                initCatalogLog.addCreateDb(dbId, dbName);
             }
         }
         dbNameToId = tmpDbNameToId;
         idToDb = tmpIdToDb;
+        initialized = true;
+        Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
     }
 
     /**
      * Catalog can't be init when creating because the external catalog may 
depend on third system.
      * So you have to make sure the client of third system is initialized 
before any method was called.
      */
-    private synchronized void makeSureInitialized() {
+    @Override
+    public synchronized void makeSureInitialized() {
+        if (!objectCreated) {
+            try {
+                HiveConf hiveConf = new HiveConf();
+                hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
getHiveMetastoreUris());
+                client = new HiveMetaStoreClient(hiveConf);
+                objectCreated = true;
+            } catch (MetaException e) {
+                LOG.warn("Failed to create HiveMetaStoreClient: {}", 
e.getMessage());
+                return;
+            }
+        }
         if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(id, -1, -1);
+                } catch (Exception e) {
+                    LOG.warn("Failed to forward init catalog {} operation to 
master. {}", name, e.getMessage());
+                }
+                return;
+            }
             init();
-            initialized = true;
         }
     }
 
@@ -120,10 +140,18 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
-        try {
-            return client.getAllTables(getRealTableName(dbName));
-        } catch (MetaException e) {
-            LOG.warn("List Table Names failed. {}", e.getMessage());
+        makeSureInitialized();
+        HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) 
idToDb.get(dbNameToId.get(dbName));
+        if (hmsExternalDatabase != null && 
hmsExternalDatabase.isInitialized()) {
+            ArrayList<String> names = Lists.newArrayList();
+            hmsExternalDatabase.getTables().stream().forEach(table -> 
names.add(table.getName()));
+            return names;
+        } else {
+            try {
+                return client.getAllTables(getRealTableName(dbName));
+            } catch (MetaException e) {
+                LOG.warn("List Table Names failed. {}", e.getMessage());
+            }
         }
         return Lists.newArrayList();
     }
@@ -161,4 +189,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
         makeSureInitialized();
         return Lists.newArrayList(dbNameToId.values());
     }
+
+    public ExternalDatabase getDbForReplay(long dbId) {
+        return idToDb.get(dbId);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
new file mode 100644
index 0000000000..48deec84e6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitCatalogLog implements Writable {
+    enum Type {
+        HMS,
+        ES,
+        UNKNOWN;
+    }
+
+    @SerializedName(value = "catalogId")
+    private long catalogId;
+
+    @SerializedName(value = "refreshCount")
+    private int refreshCount;
+
+    @SerializedName(value = "createCount")
+    private int createCount;
+
+    @SerializedName(value = "refreshDbIds")
+    private List<Long> refreshDbIds;
+
+    @SerializedName(value = "createDbIds")
+    private List<Long> createDbIds;
+
+    @SerializedName(value = "createDbNames")
+    private List<String> createDbNames;
+
+    @SerializedName(value = "type")
+    private Type type;
+
+    public InitCatalogLog() {
+        refreshCount = 0;
+        createCount = 0;
+        catalogId = 0;
+        refreshDbIds = Lists.newArrayList();
+        createDbIds = Lists.newArrayList();
+        createDbNames = Lists.newArrayList();
+        type = Type.UNKNOWN;
+    }
+
+    public void addRefreshDb(long id) {
+        refreshCount += 1;
+        refreshDbIds.add(id);
+    }
+
+    public void addCreateDb(long id, String name) {
+        createCount += 1;
+        createDbIds.add(id);
+        createDbNames.add(name);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static InitCatalogLog read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, InitCatalogLog.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
new file mode 100644
index 0000000000..eade3d12b8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitDatabaseLog implements Writable {
+    public enum Type {
+        HMS,
+        ES,
+        UNKNOWN;
+    }
+
+    @SerializedName(value = "catalogId")
+    private long catalogId;
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+
+    @SerializedName(value = "refreshCount")
+    private int refreshCount;
+
+    @SerializedName(value = "createCount")
+    private int createCount;
+
+    @SerializedName(value = "refreshTableIds")
+    private List<Long> refreshTableIds;
+
+    @SerializedName(value = "createTableIds")
+    private List<Long> createTableIds;
+
+    @SerializedName(value = "createTableNames")
+    private List<String> createTableNames;
+
+    @SerializedName(value = "type")
+    private Type type;
+
+    public InitDatabaseLog() {
+        refreshCount = 0;
+        createCount = 0;
+        catalogId = 0;
+        dbId = 0;
+        refreshTableIds = Lists.newArrayList();
+        createTableIds = Lists.newArrayList();
+        createTableNames = Lists.newArrayList();
+        type = Type.UNKNOWN;
+    }
+
+    public void addRefreshTable(long id) {
+        refreshCount += 1;
+        refreshTableIds.add(id);
+    }
+
+    public void addCreateTable(long id, String name) {
+        createCount += 1;
+        createTableIds.add(id);
+        createTableNames.add(name);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static InitDatabaseLog read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, InitDatabaseLog.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
new file mode 100644
index 0000000000..2f462b551c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+@Data
+public class InitTableLog implements Writable {
+    enum Type {
+        HMS,
+        ES,
+        UNKNOWN;
+    }
+
+    @SerializedName(value = "catalogId")
+    private long catalogId;
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+
+    @SerializedName(value = "tableId")
+    private long tableId;
+
+    @SerializedName(value = "type")
+    private Type type;
+
+    @SerializedName(value = "schema")
+    protected volatile List<Column> schema;
+
+    public InitTableLog() {}
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static InitTableLog read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, InitTableLog.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 5e9b25ffc5..cbf827b429 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -37,6 +37,7 @@ import org.apache.doris.analysis.RangePartitionDesc;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
@@ -370,6 +371,7 @@ public class EsUtil {
             column.setName(key);
             column.setIsKey(true);
             column.setIsAllowNull(true);
+            column.setUniqueId((int) Env.getCurrentEnv().getNextId());
             if (arrayFields.contains(key)) {
                 column.setType(ArrayType.create(type, true));
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 11e8ca03dd..dc48ed62a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -37,6 +37,10 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.datasource.CatalogLog;
+import org.apache.doris.datasource.ExternalObjectLog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.InitTableLog;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.DeleteInfo;
@@ -685,6 +689,27 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_INIT_CATALOG: {
+                data = InitCatalogLog.read(in);
+                isRead = true;
+                break;
+            }
+            case OperationType.OP_INIT_EXTERNAL_DB: {
+                data = InitDatabaseLog.read(in);
+                isRead = true;
+                break;
+            }
+            case OperationType.OP_INIT_EXTERNAL_TABLE: {
+                data = InitTableLog.read(in);
+                isRead = true;
+                break;
+            }
+            case OperationType.OP_REFRESH_EXTERNAL_DB:
+            case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
+                data = ExternalObjectLog.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: {
                 data = TableAddOrDropColumnsInfo.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 09ec9c24ff..39bd5e7dfd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -42,6 +42,10 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.datasource.CatalogLog;
+import org.apache.doris.datasource.ExternalObjectLog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.InitTableLog;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.Journal;
 import org.apache.doris.journal.JournalCursor;
@@ -926,6 +930,31 @@ public class EditLog {
                     env.getAuth().replayAlterUser(log);
                     break;
                 }
+                case OperationType.OP_INIT_CATALOG: {
+                    final InitCatalogLog log = (InitCatalogLog) 
journal.getData();
+                    env.getCatalogMgr().replayInitCatalog(log);
+                    break;
+                }
+                case OperationType.OP_REFRESH_EXTERNAL_DB: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayRefreshExternalDb(log);
+                    break;
+                }
+                case OperationType.OP_INIT_EXTERNAL_DB: {
+                    final InitDatabaseLog log = (InitDatabaseLog) 
journal.getData();
+                    env.getCatalogMgr().replayInitExternalDb(log);
+                    break;
+                }
+                case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayRefreshExternalTable(log);
+                    break;
+                }
+                case OperationType.OP_INIT_EXTERNAL_TABLE: {
+                    final InitTableLog log = (InitTableLog) journal.getData();
+                    env.getCatalogMgr().replayInitExternalTable(log);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1581,6 +1610,26 @@ public class EditLog {
         logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
     }
 
+    public void logInitCatalog(InitCatalogLog log) {
+        logEdit(OperationType.OP_INIT_CATALOG, log);
+    }
+
+    public void logRefreshExternalDb(ExternalObjectLog log) {
+        logEdit(OperationType.OP_REFRESH_EXTERNAL_DB, log);
+    }
+
+    public void logInitExternalDb(InitDatabaseLog log) {
+        logEdit(OperationType.OP_INIT_EXTERNAL_DB, log);
+    }
+
+    public void logRefreshExternalTable(ExternalObjectLog log) {
+        logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
+    }
+
+    public void logInitExternalTable(InitTableLog log) {
+        logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log);
+    }
+
     public Journal getJournal() {
         return this.journal;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 3ff1473071..82304acb23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -239,6 +239,11 @@ public class OperationType {
     public static final short OP_ALTER_CATALOG_NAME = 322;
     public static final short OP_ALTER_CATALOG_PROPS = 323;
     public static final short OP_REFRESH_CATALOG = 324;
+    public static final short OP_INIT_CATALOG = 325;
+    public static final short OP_REFRESH_EXTERNAL_DB = 326;
+    public static final short OP_INIT_EXTERNAL_DB = 327;
+    public static final short OP_REFRESH_EXTERNAL_TABLE = 328;
+    public static final short OP_INIT_EXTERNAL_TABLE = 329;
 
     // scheduler job and task 330-350
     public static final short OP_CREATE_MTMV_JOB = 330;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 3bee6d59ff..30fe5fdcc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -21,6 +21,7 @@ import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.alter.RollupJobV2;
 import org.apache.doris.alter.SchemaChangeJobV2;
 import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.JdbcResource;
@@ -32,6 +33,13 @@ import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.EsExternalTable;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
@@ -140,7 +148,6 @@ public class GsonUtils {
             = RuntimeTypeAdapterFactory.of(LoadJobStateUpdateInfo.class, 
"clazz")
             .registerSubtype(SparkLoadJobStateUpdateInfo.class, 
SparkLoadJobStateUpdateInfo.class.getSimpleName());
 
-
     // runtime adapter for class "Policy"
     private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory 
= RuntimeTypeAdapterFactory.of(
                     Policy.class, "clazz").registerSubtype(RowPolicy.class, 
RowPolicy.class.getSimpleName())
@@ -152,6 +159,18 @@ public class GsonUtils {
             .registerSubtype(HMSExternalCatalog.class, 
HMSExternalCatalog.class.getSimpleName())
             .registerSubtype(EsExternalCatalog.class, 
EsExternalCatalog.class.getSimpleName());
 
+    private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory 
= RuntimeTypeAdapterFactory.of(
+                    DatabaseIf.class, "clazz")
+            .registerSubtype(ExternalDatabase.class, 
ExternalDatabase.class.getSimpleName())
+            .registerSubtype(EsExternalDatabase.class, 
EsExternalDatabase.class.getSimpleName())
+            .registerSubtype(HMSExternalDatabase.class, 
HMSExternalDatabase.class.getSimpleName());
+
+    private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = 
RuntimeTypeAdapterFactory.of(
+                    TableIf.class, "clazz")
+            .registerSubtype(ExternalTable.class, 
ExternalTable.class.getSimpleName())
+            .registerSubtype(EsExternalTable.class, 
EsExternalTable.class.getSimpleName())
+            .registerSubtype(HMSExternalTable.class, 
HMSExternalTable.class.getSimpleName());
+
     // the builder of GSON instance.
     // Add any other adapters if necessary.
     private static final GsonBuilder GSON_BUILDER = new 
GsonBuilder().addSerializationExclusionStrategy(
@@ -165,7 +184,10 @@ public class GsonUtils {
             .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
             .registerTypeAdapterFactory(syncJobTypeAdapterFactory)
             
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
-            
.registerTypeAdapterFactory(policyTypeAdapterFactory).registerTypeAdapterFactory(dsTypeAdapterFactory)
+            .registerTypeAdapterFactory(policyTypeAdapterFactory)
+            .registerTypeAdapterFactory(dsTypeAdapterFactory)
+            .registerTypeAdapterFactory(dbTypeAdapterFactory)
+            .registerTypeAdapterFactory(tblTypeAdapterFactory)
             .registerTypeAdapter(ImmutableMap.class, new 
ImmutableMapDeserializer())
             .registerTypeAdapter(AtomicBoolean.class, new 
AtomicBooleanAdapter());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java
new file mode 100644
index 0000000000..c3a08d6d50
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
+import org.apache.doris.thrift.TInitExternalCtlMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * The client for Observer FE to forward external datasource object init 
request to master.
+ * Including init ExternalCatalog, ExternalDatabase and ExternalTable.
+ * This client will wait for the journal ID replayed at this Observer FE 
before return.
+ */
+public class MasterCatalogExecutor {
+
+    private static final Logger LOG = 
LogManager.getLogger(MasterCatalogExecutor.class);
+
+    private final ConnectContext ctx;
+    private int waitTimeoutMs;
+
+    public MasterCatalogExecutor() {
+        ctx = ConnectContext.get();
+        waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+    }
+
+    public void forward(long catalogId, long dbId, long tableId) throws 
Exception {
+        if (!ctx.getEnv().isReady()) {
+            throw new Exception("Current catalog is not ready, please wait for 
a while.");
+        }
+        String masterHost = ctx.getEnv().getMasterIp();
+        int masterRpcPort = ctx.getEnv().getMasterRpcPort();
+        TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, 
masterRpcPort);
+
+        FrontendService.Client client = null;
+        try {
+            client = ClientPool.frontendPool.borrowObject(thriftAddress, 
waitTimeoutMs);
+        } catch (Exception e) {
+            throw new Exception("Failed to get master client.", e);
+        }
+        TInitExternalCtlMetaRequest request = new 
TInitExternalCtlMetaRequest();
+        request.setCatalogId(catalogId);
+        if (dbId != -1) {
+            request.setDbId(dbId);
+        }
+        if (tableId != -1) {
+            request.setTableId(tableId);
+        }
+        boolean isReturnToPool = false;
+        try {
+            TInitExternalCtlMetaResult result = 
client.initExternalCtlMeta(request);
+            
ConnectContext.get().getEnv().getJournalObservable().waitOn(result.maxJournalId,
 waitTimeoutMs);
+            isReturnToPool = true;
+        } catch (Exception e) {
+            LOG.warn("Failed to finish forward init operation, please try 
again. ", e);
+            throw e;
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f6641ccb2..53ee7dd637 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -43,6 +44,7 @@ import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.master.MasterImpl;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -77,6 +79,8 @@ import org.apache.doris.thrift.TGetStoragePolicy;
 import org.apache.doris.thrift.TGetStoragePolicyResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
+import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
+import org.apache.doris.thrift.TInitExternalCtlMetaResult;
 import org.apache.doris.thrift.TListPrivilegesResult;
 import org.apache.doris.thrift.TListTableStatusResult;
 import org.apache.doris.thrift.TLoadTxn2PCRequest;
@@ -1061,4 +1065,76 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         LOG.debug("refresh storage policy request: {}", result);
         return result;
     }
+
+    @Override
+    public TInitExternalCtlMetaResult 
initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException {
+        if (request.isSetCatalogId() && request.isSetDbId() && 
request.isSetTableId()) {
+            return initTable(request.catalogId, request.dbId, request.tableId);
+        } else if (request.isSetCatalogId() && request.isSetDbId()) {
+            return initDb(request.catalogId, request.dbId);
+        } else if (request.isSetCatalogId()) {
+            return initCatalog(request.catalogId);
+        } else {
+            throw new TException("Catalog name is not set. Init failed.");
+        }
+    }
+
+    private TInitExternalCtlMetaResult initCatalog(long catalogId) throws 
TException {
+        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new TException("Only support forward ExternalCatalog init 
operation.");
+        }
+        ((ExternalCatalog) catalog).makeSureInitialized();
+        TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+        result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+        result.setStatus("OK");
+        return result;
+    }
+
+    private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) 
throws TException {
+        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new TException("Only support forward ExternalCatalog init 
operation.");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbId);
+        if (db == null) {
+            throw new TException("database " + dbId + " is null");
+        }
+        if (!(db instanceof ExternalDatabase)) {
+            throw new TException("Only support forward ExternalDatabase init 
operation.");
+        }
+        ((ExternalDatabase) db).makeSureInitialized();
+        TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+        result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+        result.setStatus("OK");
+        return result;
+    }
+
+    private TInitExternalCtlMetaResult initTable(long catalogId, long dbId, 
long tableId)
+            throws TException {
+        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new TException("Only support forward ExternalCatalog init 
operation.");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbId);
+        if (db == null) {
+            throw new TException("database " + dbId + " is null");
+        }
+        if (!(db instanceof ExternalDatabase)) {
+            throw new TException("Only support forward ExternalDatabase init 
operation.");
+        }
+        TableIf table = db.getTableNullable(tableId);
+        if (table == null) {
+            throw new TException("table " + tableId + " is null");
+        }
+        if (!(table instanceof ExternalTable)) {
+            throw new TException("Only support forward ExternalTable init 
operation.");
+        }
+
+        ((ExternalTable) table).makeSureInitialized();
+        TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
+        result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+        result.setStatus("OK");
+        return result;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index 871870178a..d1564261e6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -27,7 +27,13 @@ import org.apache.doris.analysis.GrantStmt;
 import org.apache.doris.analysis.ShowCatalogStmt;
 import org.apache.doris.analysis.SwitchStmt;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.EsExternalTable;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -37,6 +43,7 @@ import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -95,6 +102,15 @@ public class CatalogMgrTest extends TestWithFeService {
                 rootCtx);
         env.getCatalogMgr().createCatalog(iceBergCatalog);
 
+        // create es catalog
+        CreateCatalogStmt esCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt(
+                "create catalog es properties('type' = 'es', 
'elasticsearch.hosts' = 'http://192.168.0.1');",
+                rootCtx);
+        env.getCatalogMgr().createCatalog(esCatalog);
+
+        createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("hive"));
+        createDbAndTableForCatalog(env.getCatalogMgr().getCatalog("es"));
+
         // switch to hive.
         SwitchStmt switchHive = (SwitchStmt) parseAndAnalyzeStmt("switch 
hive;", rootCtx);
         env.changeCatalog(rootCtx, switchHive.getCatalogName());
@@ -109,6 +125,26 @@ public class CatalogMgrTest extends TestWithFeService {
         user2.analyze(SystemInfoService.DEFAULT_CLUSTER);
     }
 
+    private void createDbAndTableForCatalog(CatalogIf catalog) {
+        List<Column> schema = Lists.newArrayList();
+        schema.add(new Column("k1", PrimitiveType.INT));
+        if (catalog instanceof HMSExternalCatalog) {
+            HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
+            HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog, 
10000, "hive_db1");
+            HMSExternalTable tbl = new HMSExternalTable(10001, "hive_tbl1", 
"hive_db1", hmsCatalog);
+            tbl.setNewFullSchema(schema);
+            db.addTableForTest(tbl);
+            hmsCatalog.addDatabaseForTest(db);
+        } else if (catalog instanceof ExternalCatalog) {
+            EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
+            EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, 
"es_db1");
+            EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", 
"es_tbl1", esCatalog);
+            tbl.setNewFullSchema(schema);
+            db.addTableForTest(tbl);
+            esCatalog.addDatabaseForTest(db);
+        }
+    }
+
     @Test
     public void testNormalCase() throws Exception {
         String createCatalogSql = "CREATE CATALOG hms_catalog "
@@ -119,7 +155,7 @@ public class CatalogMgrTest extends TestWithFeService {
         String showCatalogSql = "SHOW CATALOGS";
         ShowCatalogStmt showStmt = (ShowCatalogStmt) 
parseAndAnalyzeStmt(showCatalogSql);
         ShowResultSet showResultSet = mgr.showCatalogs(showStmt);
-        Assertions.assertEquals(4, showResultSet.getResultRows().size());
+        Assertions.assertEquals(5, showResultSet.getResultRows().size());
 
         String alterCatalogNameSql = "ALTER CATALOG hms_catalog RENAME " + 
MY_CATALOG + ";";
         AlterCatalogNameStmt alterNameStmt = (AlterCatalogNameStmt) 
parseAndAnalyzeStmt(alterCatalogNameSql);
@@ -151,7 +187,7 @@ public class CatalogMgrTest extends TestWithFeService {
         DropCatalogStmt dropCatalogStmt = (DropCatalogStmt) 
parseAndAnalyzeStmt(dropCatalogSql);
         mgr.dropCatalog(dropCatalogStmt);
         showResultSet = mgr.showCatalogs(showStmt);
-        Assertions.assertEquals(3, showResultSet.getResultRows().size());
+        Assertions.assertEquals(4, showResultSet.getResultRows().size());
     }
 
     private void testCatalogMgrPersist() throws Exception {
@@ -173,7 +209,7 @@ public class CatalogMgrTest extends TestWithFeService {
         DataInputStream dis = new DataInputStream(new FileInputStream(file));
         CatalogMgr mgr2 = CatalogMgr.read(dis);
 
-        Assert.assertEquals(4, mgr2.listCatalogs().size());
+        Assert.assertEquals(5, mgr2.listCatalogs().size());
         Assert.assertEquals(myCatalog.getId(), 
mgr2.getCatalog(MY_CATALOG).getId());
         Assert.assertEquals(0, mgr2.getInternalCatalog().getId());
         Assert.assertEquals(0, 
mgr2.getCatalog(InternalCatalog.INTERNAL_DS_ID).getId());
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index b3c5fe7e29..5d48c1ae7e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -678,6 +678,17 @@ struct TWaitingTxnStatusResult {
     2: optional i32 txn_status_id
 }
 
+struct TInitExternalCtlMetaRequest {
+    1: optional i64 catalogId
+    2: optional i64 dbId
+    3: optional i64 tableId
+}
+
+struct TInitExternalCtlMetaResult {
+    1: optional i64 maxJournalId;
+    2: optional string status;
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -713,4 +724,5 @@ service FrontendService {
     TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
 
     AgentService.TGetStoragePolicyResult refreshStoragePolicy()
+    TInitExternalCtlMetaResult initExternalCtlMeta(1: 
TInitExternalCtlMetaRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to