This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 81a9284305 [improvement][refactor](image) refactor the read and load method of meta image #10005 81a9284305 is described below commit 81a92843051f7a2a4102a1e1f95bca3b42513115 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri Jun 10 14:56:14 2022 +0800 [improvement][refactor](image) refactor the read and load method of meta image #10005 --- .../src/main/java/org/apache/doris/PaloFe.java | 2 +- .../java/org/apache/doris/catalog/Catalog.java | 17 +- .../java/org/apache/doris/common/FeConstants.java | 2 + .../doris/datasource/InternalDataSource.java | 4 + .../{common => persist/meta}/FeMetaFormat.java | 2 +- .../doris/{common => persist/meta}/MetaFooter.java | 13 +- .../doris/{common => persist/meta}/MetaHeader.java | 2 +- .../doris/{common => persist/meta}/MetaIndex.java | 2 +- .../{common => persist/meta}/MetaJsonHeader.java | 3 +- .../{common => persist/meta}/MetaMagicNumber.java | 4 +- .../doris/persist/meta/MetaPersistMethod.java | 203 +++++++++++++++++++++ .../doris/{common => persist/meta}/MetaReader.java | 54 +++--- .../doris/{common => persist/meta}/MetaWriter.java | 52 +++--- .../doris/persist/meta/PersistMetaModules.java | 56 ++++++ .../java/org/apache/doris/catalog/CatalogTest.java | 2 +- .../java/org/apache/doris/qe/VariableMgrTest.java | 1 + 16 files changed, 344 insertions(+), 75 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java index 32ebb5d691..0d1ec31434 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java +++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java @@ -22,7 +22,6 @@ import org.apache.doris.common.CommandLineOptions; import org.apache.doris.common.Config; import org.apache.doris.common.LdapConfig; import org.apache.doris.common.Log4jConfig; -import org.apache.doris.common.MetaReader; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.Version; import org.apache.doris.common.util.JdkUtils; @@ -31,6 +30,7 @@ import org.apache.doris.httpv2.HttpServer; import org.apache.doris.journal.bdbje.BDBDebugger; import org.apache.doris.journal.bdbje.BDBTool; import org.apache.doris.journal.bdbje.BDBToolOptions; +import org.apache.doris.persist.meta.MetaReader; import org.apache.doris.qe.QeService; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FeServer; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index a42add1079..d051e8afbf 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -98,10 +98,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; -import org.apache.doris.common.MetaHeader; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.MetaReader; -import org.apache.doris.common.MetaWriter; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; @@ -187,6 +184,9 @@ import org.apache.doris.persist.StorageInfoV2; import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TablePropertyInfo; import org.apache.doris.persist.TruncateTableInfo; +import org.apache.doris.persist.meta.MetaHeader; +import org.apache.doris.persist.meta.MetaReader; +import org.apache.doris.persist.meta.MetaWriter; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.policy.PolicyMgr; @@ -237,7 +237,6 @@ import org.codehaus.jackson.map.ObjectMapper; import java.io.BufferedReader; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; @@ -1631,6 +1630,10 @@ public class Catalog { return newChecksum; } + public long loadBackends(DataInputStream dis, long checksum) throws IOException { + return systemInfo.loadBackends(dis, checksum); + } + public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException { return getInternalDataSource().loadDb(dis, checksum); } @@ -1940,6 +1943,10 @@ public class Catalog { return checksum; } + public long saveBackends(CountingDataOutputStream dos, long checksum) throws IOException { + return systemInfo.saveBackends(dos, checksum); + } + public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException { return getInternalDataSource().saveDb(dos, checksum); } @@ -4668,7 +4675,7 @@ public class Catalog { pluginMgr.installPlugin(stmt); } - public long savePlugins(DataOutputStream dos, long checksum) throws IOException { + public long savePlugins(CountingDataOutputStream dos, long checksum) throws IOException { Catalog.getCurrentPluginMgr().write(dos); return checksum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 27ffda6d87..4cb2bd877b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -17,6 +17,8 @@ package org.apache.doris.common; +import org.apache.doris.persist.meta.FeMetaFormat; + public class FeConstants { // Database and table's default configurations, we will never change them public static short default_replication_num = 3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java index f421c59e9d..7068cd92d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java @@ -3208,6 +3208,10 @@ public class InternalDataSource implements DataSourceIf { } Catalog.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId()); } + // ATTN: this should be done after load Db, and before loadAlterJob + recreateTabletInvertIndex(); + // rebuild es state state + getEsRepository().loadTableFromCatalog(); LOG.info("finished replay databases from image"); return newChecksum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java index 7c216e82c0..1c0eb8fc73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaFormat.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/FeMetaFormat.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; public enum FeMetaFormat { COR1("COR1", "v1"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java similarity index 91% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java index 1e9dcace46..bc35027208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaFooter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaFooter.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -107,4 +107,15 @@ public class MetaFooter { this.length = length; } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("checksum: ").append(checksum); + sb.append("\nlength: ").append(length); + sb.append("\nindices:"); + for (MetaIndex metaIndex : metaIndices) { + sb.append("\n\t").append(metaIndex.toString()); + } + return sb.toString(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java index f96bfc7f9e..0528f2dc2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaHeader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaHeader.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java index 399634ea28..2c5a8ace51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaIndex.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; import org.apache.doris.common.io.Text; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java index 527af9558f..c59b47ad73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaJsonHeader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaJsonHeader.java @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.codehaus.jackson.map.ObjectMapper; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java index 204c5bb588..6e8af5912c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaMagicNumber.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaMagicNumber.java @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; + +import org.apache.doris.common.FeConstants; import java.io.IOException; import java.io.RandomAccessFile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java new file mode 100644 index 0000000000..16e9de59e0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist.meta; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.io.CountingDataOutputStream; + +import java.io.DataInputStream; +import java.lang.reflect.Method; + +/** + * Defines a write and read method for the metadata module + * that needs to be persisted to the image. + */ +public class MetaPersistMethod { + public String name; + public Method readMethod; + public Method writeMethod; + + public MetaPersistMethod(String name) { + this.name = name; + } + + /** + * All meta modules should be added to this method. + * Modules' names are defined in {@link PersistMetaModules} + * + * @param name + * @return + * @throws NoSuchMethodException + */ + public static MetaPersistMethod create(String name) throws NoSuchMethodException { + MetaPersistMethod metaPersistMethod = new MetaPersistMethod(name); + switch (name) { + case "masterInfo": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadMasterInfo", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveMasterInfo", CountingDataOutputStream.class, long.class); + break; + case "frontends": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadFrontends", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveFrontends", CountingDataOutputStream.class, long.class); + break; + case "backends": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadBackends", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveBackends", CountingDataOutputStream.class, long.class); + break; + case "db": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadDb", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveDb", CountingDataOutputStream.class, long.class); + break; + case "loadJob": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadLoadJob", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveLoadJob", CountingDataOutputStream.class, long.class); + break; + case "alterJob": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadAlterJob", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveAlterJob", CountingDataOutputStream.class, long.class); + break; + case "recycleBin": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadRecycleBin", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveRecycleBin", CountingDataOutputStream.class, long.class); + break; + case "globalVariable": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadGlobalVariable", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveGlobalVariable", CountingDataOutputStream.class, + long.class); + break; + case "cluster": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadCluster", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveCluster", CountingDataOutputStream.class, long.class); + break; + case "broker": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadBrokers", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveBrokers", CountingDataOutputStream.class, long.class); + break; + case "resources": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadResources", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveResources", CountingDataOutputStream.class, long.class); + break; + case "exportJob": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadExportJob", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveExportJob", CountingDataOutputStream.class, long.class); + break; + case "syncJob": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadSyncJobs", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveSyncJobs", CountingDataOutputStream.class, long.class); + break; + case "backupHandler": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadBackupHandler", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveBackupHandler", CountingDataOutputStream.class, long.class); + break; + case "paloAuth": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadPaloAuth", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("savePaloAuth", CountingDataOutputStream.class, long.class); + break; + case "transactionState": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadTransactionState", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveTransactionState", CountingDataOutputStream.class, + long.class); + break; + case "colocateTableIndex": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadColocateTableIndex", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveColocateTableIndex", CountingDataOutputStream.class, + long.class); + break; + case "routineLoadJobs": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadRoutineLoadJobs", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveRoutineLoadJobs", CountingDataOutputStream.class, + long.class); + break; + case "loadJobV2": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadLoadJobsV2", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveLoadJobsV2", CountingDataOutputStream.class, long.class); + break; + case "smallFiles": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadSmallFiles", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveSmallFiles", CountingDataOutputStream.class, long.class); + break; + case "plugins": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadPlugins", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("savePlugins", CountingDataOutputStream.class, long.class); + break; + case "deleteHandler": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadDeleteHandler", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveDeleteHandler", CountingDataOutputStream.class, long.class); + break; + case "sqlBlockRule": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadSqlBlockRule", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("saveSqlBlockRule", CountingDataOutputStream.class, long.class); + break; + case "policy": + metaPersistMethod.readMethod = + Catalog.class.getDeclaredMethod("loadPolicy", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Catalog.class.getDeclaredMethod("savePolicy", CountingDataOutputStream.class, long.class); + break; + default: + break; + } + return metaPersistMethod; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java similarity index 66% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java index bbe474f0df..df2c29cd53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.DdlException; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; @@ -29,6 +30,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; /** * Image Format: @@ -69,48 +71,36 @@ public class MetaReader { LOG.info("start load image from {}. is ckpt: {}", imageFile.getAbsolutePath(), Catalog.isCheckpointThread()); long loadImageStartTime = System.currentTimeMillis(); MetaHeader metaHeader = MetaHeader.read(imageFile); + MetaFooter metaFooter = MetaFooter.read(imageFile); long checksum = 0; try (DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(imageFile)))) { + // 1. Skip image file header IOUtils.skipFully(dis, metaHeader.getEnd()); + // 2. Read meta header first checksum = catalog.loadHeader(dis, metaHeader, checksum); - checksum = catalog.loadMasterInfo(dis, checksum); - checksum = catalog.loadFrontends(dis, checksum); - checksum = Catalog.getCurrentSystemInfo().loadBackends(dis, checksum); - checksum = catalog.loadDb(dis, checksum); - // ATTN: this should be done after load Db, and before loadAlterJob - catalog.getInternalDataSource().recreateTabletInvertIndex(); - // rebuild es state state - catalog.getEsRepository().loadTableFromCatalog(); - checksum = catalog.loadLoadJob(dis, checksum); - checksum = catalog.loadAlterJob(dis, checksum); - checksum = catalog.loadRecycleBin(dis, checksum); - checksum = catalog.loadGlobalVariable(dis, checksum); - checksum = catalog.loadCluster(dis, checksum); - checksum = catalog.loadBrokers(dis, checksum); - checksum = catalog.loadResources(dis, checksum); - checksum = catalog.loadExportJob(dis, checksum); - checksum = catalog.loadSyncJobs(dis, checksum); - checksum = catalog.loadBackupHandler(dis, checksum); - checksum = catalog.loadPaloAuth(dis, checksum); - // global transaction must be replayed before load jobs v2 - checksum = catalog.loadTransactionState(dis, checksum); - checksum = catalog.loadColocateTableIndex(dis, checksum); - checksum = catalog.loadRoutineLoadJobs(dis, checksum); - checksum = catalog.loadLoadJobsV2(dis, checksum); - checksum = catalog.loadSmallFiles(dis, checksum); - checksum = catalog.loadPlugins(dis, checksum); - checksum = catalog.loadDeleteHandler(dis, checksum); - checksum = catalog.loadSqlBlockRule(dis, checksum); - checksum = catalog.loadPolicy(dis, checksum); + // 3. Read other meta modules + // Modules must be read in the order in which the metadata was written + for (MetaIndex metaIndex : metaFooter.metaIndices) { + if (metaIndex.name.equals("header")) { + // skip meta header, which has been read before. + continue; + } + MetaPersistMethod persistMethod = PersistMetaModules.MODULES_MAP.get(metaIndex.name); + if (persistMethod == null) { + throw new IOException("Unknown meta module: " + metaIndex.name + ". Known moduels: " + + PersistMetaModules.MODULE_NAMES); + } + checksum = (long) persistMethod.readMethod.invoke(catalog, dis, checksum); + } + } catch (InvocationTargetException | IllegalAccessException e) { + throw new IOException(e); } - MetaFooter metaFooter = MetaFooter.read(imageFile); long remoteChecksum = metaFooter.checksum; Preconditions.checkState(remoteChecksum == checksum, remoteChecksum + " vs. " + checksum); long loadImageEndTime = System.currentTimeMillis(); LOG.info("finished to load image in " + (loadImageEndTime - loadImageStartTime) + " ms"); } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java similarity index 60% rename from fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java rename to fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java index 553a022cf9..781394f078 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaWriter.java @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.persist.meta; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.Reference; import org.apache.doris.common.io.CountingDataOutputStream; import com.google.common.collect.Lists; @@ -28,6 +29,7 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.List; /** @@ -100,42 +102,32 @@ public class MetaWriter { long startPosition = MetaHeader.write(imageFile); List<MetaIndex> metaIndices = Lists.newArrayList(); FileOutputStream imageFileOut = new FileOutputStream(imageFile, true); - try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream( - imageFileOut), startPosition)) { + try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(imageFileOut), + startPosition)) { writer.setDelegate(dos, metaIndices); long replayedJournalId = catalog.getReplayedJournalId(); - checksum.setRef(writer.doWork("header", () -> catalog.saveHeader(dos, replayedJournalId, checksum.getRef()))); - checksum.setRef(writer.doWork("masterInfo", () -> catalog.saveMasterInfo(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("frontends", () -> catalog.saveFrontends(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("backends", () -> Catalog.getCurrentSystemInfo().saveBackends(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("db", () -> catalog.saveDb(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("loadJob", () -> catalog.saveLoadJob(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("alterJob", () -> catalog.saveAlterJob(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("recycleBin", () -> catalog.saveRecycleBin(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("globalVariable", () -> catalog.saveGlobalVariable(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("cluster", () -> catalog.saveCluster(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("broker", () -> catalog.saveBrokers(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("resources", () -> catalog.saveResources(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("exportJob", () -> catalog.saveExportJob(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("syncJob", () -> catalog.saveSyncJobs(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("backupHandler", () -> catalog.saveBackupHandler(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("paloAuth", () -> catalog.savePaloAuth(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("transactionState", () -> catalog.saveTransactionState(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("colocateTableIndex", () -> catalog.saveColocateTableIndex(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("routineLoadJobs", () -> catalog.saveRoutineLoadJobs(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("loadJobV2", () -> catalog.saveLoadJobsV2(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("smallFiles", () -> catalog.saveSmallFiles(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("plugins", () -> catalog.savePlugins(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("deleteHandler", () -> catalog.saveDeleteHandler(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("sqlBlockRule", () -> catalog.saveSqlBlockRule(dos, checksum.getRef()))); - checksum.setRef(writer.doWork("policy", () -> catalog.savePolicy(dos, checksum.getRef()))); + // 1. write header first + checksum.setRef( + writer.doWork("header", () -> catalog.saveHeader(dos, replayedJournalId, checksum.getRef()))); + // 2. write other modules + for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) { + checksum.setRef(writer.doWork(m.name, () -> { + try { + return (long) m.writeMethod.invoke(catalog, dos, checksum.getRef()); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.warn("failed to write meta module: {}", m.name, e); + throw new RuntimeException(e); + } + })); + } + // 3. force sync to disk imageFileOut.getChannel().force(true); } MetaFooter.write(imageFile, metaIndices, checksum.getRef()); long saveImageEndTime = System.currentTimeMillis(); - LOG.info("finished save image {} in {} ms. checksum is {}", - imageFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum.getRef()); + LOG.info("finished save image {} in {} ms. checksum is {}", imageFile.getAbsolutePath(), + (saveImageEndTime - saveImageStartTime), checksum.getRef()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java new file mode 100644 index 0000000000..6e01f13af9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist.meta; + +import com.clearspring.analytics.util.Lists; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +/** + * Save all MetaPersistMethods. + */ +public class PersistMetaModules { + // module name -> MetaPersistMethod + public static final Map<String, MetaPersistMethod> MODULES_MAP; + // Save MetaPersistMethod in order. + // The write and read of meta modules should be in same order. + public static final List<MetaPersistMethod> MODULES_IN_ORDER; + + public static final ImmutableList<String> MODULE_NAMES = ImmutableList.copyOf( + new String[] {"masterInfo", "frontends", "backends", "db", "loadJob", "alterJob", "recycleBin", + "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", + "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", + "plugins", "deleteHandler", "sqlBlockRule", "policy"}); + + static { + MODULES_MAP = Maps.newHashMap(); + MODULES_IN_ORDER = Lists.newArrayList(); + try { + for (String name : MODULE_NAMES) { + MetaPersistMethod persistMethod = MetaPersistMethod.create(name); + MODULES_MAP.put(name, persistMethod); + MODULES_IN_ORDER.add(persistMethod); + } + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java index d86c8e3455..e6d70de68f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTest.java @@ -18,11 +18,11 @@ package org.apache.doris.catalog; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.MetaHeader; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.load.Load; import org.apache.doris.load.LoadJob; import org.apache.doris.meta.MetaContext; +import org.apache.doris.persist.meta.MetaHeader; import mockit.Expectations; import org.junit.Assert; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java index 5d9156c520..5654d44ee6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java @@ -179,6 +179,7 @@ public class VariableMgrTest { Deencapsulation.setField(Catalog.class, "checkpointThreadId", Thread.currentThread().getId()); currentCatalog.getCheckpointer().doCheckpoint(); } catch (Throwable e) { + e.printStackTrace(); Assert.fail(e.getMessage()); } finally { // Restore the ckptThreadId --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org