This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b2b9e22  [CreateTable] Check backend disk has available capacity by 
storage medium before create table (#3519)
b2b9e22 is described below

commit b2b9e22b240ad150ac40ceeff3e8a7ee0580b696
Author: WingC <[email protected]>
AuthorDate: Sat Jun 27 20:36:31 2020 -0500

    [CreateTable] Check backend disk has available capacity by storage medium 
before create table (#3519)
    
    Currently we choose BE random without check disk is available,
    the create table will failed until create tablet task is sent to BE
    and BE will check is there has available capacity to create tablet.
    So check backend disk available by storage medium will reduce unnecessary 
RPC call.
---
 docs/en/administrator-guide/config/fe_config.md    |   7 +
 docs/en/getting-started/data-partition.md          |   4 +-
 .../sql-statements/Data Definition/CREATE TABLE.md |   1 +
 docs/zh-CN/administrator-guide/config/fe_config.md |   7 +
 docs/zh-CN/getting-started/data-partition.md       |   4 +-
 .../sql-statements/Data Definition/CREATE TABLE.md |   9 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  31 +-
 .../main/java/org/apache/doris/common/Config.java  | 318 +++++++++--------
 .../main/java/org/apache/doris/system/Backend.java |  41 ++-
 .../org/apache/doris/system/SystemInfoService.java |  21 +-
 .../apache/doris/catalog/ColocateTableTest.java    | 397 +++++++++------------
 .../org/apache/doris/catalog/CreateTableTest.java  |  18 +-
 .../org/apache/doris/utframe/AnotherDemoTest.java  |  20 +-
 .../org/apache/doris/utframe/UtFrameUtils.java     |  18 +-
 14 files changed, 492 insertions(+), 404 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md 
b/docs/en/administrator-guide/config/fe_config.md
index 03b93ed..75ad3fe 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -591,3 +591,10 @@ The value for thrift_client_timeout_ms is set to be larger 
than zero to prevent
 
 ### `with_k8s_certs`
 
+### `enable_strict_storage_medium_check`
+
+This configuration indicates that when the table is being built, it checks for 
the presence of the appropriate storage medium in the cluster. For example, 
when the user specifies that the storage medium is' SSD 'when the table is 
built, but only' HDD 'disks exist in the cluster,
+
+If this parameter is' True ', the error 'Failed to find enough host in all 
Backends with storage medium with storage medium is SSD, need 3'.
+
+If this parameter is' False ', no error is reported when the table is built. 
Instead, the table is built on a disk with 'HDD' as the storage medium.
diff --git a/docs/en/getting-started/data-partition.md 
b/docs/en/getting-started/data-partition.md
index b0f6e1e..9b5090e 100644
--- a/docs/en/getting-started/data-partition.md
+++ b/docs/en/getting-started/data-partition.md
@@ -251,7 +251,9 @@ Replication_num
     * The BE data storage directory can be explicitly specified as SSD or HDD 
(differentiated by .SSD or .HDD suffix). When you build a table, you can 
uniformly specify the media for all Partition initial storage. Note that the 
suffix is ​​to explicitly specify the disk media without checking to see if it 
matches the actual media type.
     * The default initial storage media can be specified by 
`default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if 
not, by default, HDD. If specified as an SSD, the data is initially stored on 
the SSD.
     * If storage\_cooldown\_time is not specified, the data is automatically 
migrated from the SSD to the HDD after 30 days by default. If 
storage\_cooldown\_time is specified, the data will not migrate until the 
storage_cooldown_time time is reached.
-    * Note that this parameter is just a "best effort" setting when 
storage_medium is specified. Even if no SSD storage media is set in the 
cluster, no error is reported and it is automatically stored in the available 
data directory. Similarly, if the SSD media is inaccessible and out of space, 
the data may initially be stored directly on other available media. When the 
data expires and is migrated to the HDD, if the HDD media is inaccessible and 
there is not enough space, the migration  [...]
+    * Note that when storage_medium is specified, if FE parameter 
'enable_strict_storage_medium_check' is' True 'this parameter is simply a' do 
your best 'setting. Even if SSD storage media is not set up within the cluster, 
no errors are reported, and it is automatically stored in the available data 
directory.
+      Similarly, if the SSD media is not accessible and space is insufficient, 
it is possible to initially store data directly on other available media. When 
the data is due to be migrated to an HDD, the migration may also fail (but will 
try again and again) if the HDD medium is not accessible and space is 
insufficient.
+      If FE parameter 'enable_strict_storage_medium_check' is' False ', then 
'Failed to find enough host in all Backends with storage medium is SSD' will be 
reported when SSD storage medium is not set in the cluster.
 
 ### ENGINE
 
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md
index 945c726..de59bdd 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md      
+++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md      
@@ -222,6 +222,7 @@ Syntax:
         ```
 
         storage_medium:         SSD or HDD, The default initial storage media 
can be specified by `default_storage_medium= XXX` in the fe configuration file 
`fe.conf`, or, if not, by default, HDD.
+                                Note: when FE configuration 
'enable_strict_storage_medium_check' is' True ', if the corresponding storage 
medium is not set in the cluster, the construction clause 'Failed to find 
enough host in all backends with storage medium is SSD|HDD'.
         storage_cooldown_time:  If storage_medium is SSD, data will be 
automatically moved to HDD   when timeout.
                                 Default is 30 days.
                                 Format: "yyyy-MM-dd HH:mm:ss"
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md 
b/docs/zh-CN/administrator-guide/config/fe_config.md
index 671d7cf..5e4519e 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -589,3 +589,10 @@ thrift_client_timeout_ms 的值被设置为大于0来避免线程卡在java.net.
 
 ### `with_k8s_certs`
 
+### `enable_strict_storage_medium`
+
+该配置表示在建表时,检查集群中是否存在相应的存储介质。例如当用户指定建表时存储介质为`SSD`,但此时集群中只存在`HDD`的磁盘时,
+
+若该参数为`True`,则建表时会报错 `Failed to find enough host in all backends with storage 
medium with storage medium is SSD, need 3`.
+
+若该参数为`False`,则建表时不会报错,而是将表建立在存储介质为`HDD`的磁盘上。
diff --git a/docs/zh-CN/getting-started/data-partition.md 
b/docs/zh-CN/getting-started/data-partition.md
index 9b7346b..0e69b70 100644
--- a/docs/zh-CN/getting-started/data-partition.md
+++ b/docs/zh-CN/getting-started/data-partition.md
@@ -254,7 +254,9 @@ PARTITION BY RANGE(`date`, `id`)
     * BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀区分)。建表时,可以统一指定所有 
Partition 初始存储的介质。注意,后缀作用是显式指定磁盘介质,而不会检查是否与实际介质类型相符。
     * 默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 
`default_storage_medium=xxx`,如果没有指定,则默认为 HDD。如果指定为 SSD,则数据初始存放在 SSD 上。
     * 如果没有指定 storage\_cooldown\_time,则默认 30 天后,数据会从 SSD 自动迁移到 HDD 上。如果指定了 
storage\_cooldown\_time,则在到达 storage_cooldown_time 时间后,数据才会迁移。
-    * 注意,当指定 storage_medium 时,该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 
存储介质,也不会报错,而是自动存储在可用的数据目录中。同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 
HDD 时,如果 HDD 介质不可访问、空间不足,也可能迁移失败(但是会不断尝试)。
+    * 注意,当指定 storage_medium 时,如果FE参数 `enable_strict_storage_medium_check` 为 
`True` 该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 存储介质,也不会报错,而是自动存储在可用的数据目录中。
+      同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 HDD 时,如果 HDD 
介质不可访问、空间不足,也可能迁移失败(但是会不断尝试)。
+      如果FE参数 `enable_strict_storage_medium_check` 为 `False` 则当集群内没有设置 SSD 
存储介质时,会报错 `Failed to find enough host in all backends with storage medium is 
SSD`。
 
 ### ENGINE
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md
index 1513fb7..69baa3e 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md   
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md   
@@ -243,10 +243,11 @@ under the License.
     ```
 
        storage_medium:        用于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 
`fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
-           storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
-                                   默认存放 30 天。
-                                   格式为:"yyyy-MM-dd HH:mm:ss"
-           replication_num:        指定分区的副本数。默认为 3
+                               注意:当FE配置项 `enable_strict_storage_medium_check` 
为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends 
with storage medium is SSD|HDD`. 
+       storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
+                               默认存放 30 天。
+                               格式为:"yyyy-MM-dd HH:mm:ss"
+       replication_num:        指定分区的副本数。默认为 3
     
        当表为单分区表时,这些属性为表的属性。
            当表为两级分区时,这些属性为附属于每一个分区。
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 4fb3361..cc355c0 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -436,6 +436,11 @@ public class Catalog {
         return this.tabletInvertedIndex;
     }
 
+    // only for test
+    public void setColocateTableIndex(ColocateTableIndex colocateTableIndex) {
+        this.colocateTableIndex = colocateTableIndex;
+    }
+
     public ColocateTableIndex getColocateTableIndex() {
         return this.colocateTableIndex;
     }
@@ -4282,7 +4287,11 @@ public class Catalog {
                 if (chooseBackendsArbitrary) {
                     // This is the first colocate table in the group, or just 
a normal table,
                     // randomly choose backends
-                    chosenBackendIds = chosenBackendIdBySeq(replicationNum, 
clusterName);
+                    if (Config.enable_strict_storage_medium_check) {
+                        chosenBackendIds = 
chosenBackendIdBySeq(replicationNum, clusterName, 
tabletMeta.getStorageMedium());
+                    } else {
+                        chosenBackendIds = 
chosenBackendIdBySeq(replicationNum, clusterName);
+                    }
                     backendsPerBucketSeq.add(chosenBackendIds);
                 } else {
                     // get backends from existing backend sequence
@@ -4299,7 +4308,7 @@ public class Catalog {
                 Preconditions.checkState(chosenBackendIds.size() == 
replicationNum, chosenBackendIds.size() + " vs. "+ replicationNum);
             }
 
-            if (backendsPerBucketSeq != null && groupId != null) {
+            if (groupId != null) {
                 colocateIndex.addBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
             }
 
@@ -4309,6 +4318,15 @@ public class Catalog {
     }
 
     // create replicas for tablet with random chosen backends
+    private List<Long> chosenBackendIdBySeq(int replicationNum, String 
clusterName, TStorageMedium storageMedium) throws DdlException {
+        List<Long> chosenBackendIds = 
Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMedium(replicationNum,
+                true, true, clusterName, storageMedium);
+        if (chosenBackendIds == null) {
+            throw new DdlException("Failed to find enough host with storage 
medium is " + storageMedium + " in all backends. need: " + replicationNum);
+        }
+        return chosenBackendIds;
+    }
+
     private List<Long> chosenBackendIdBySeq(int replicationNum, String 
clusterName) throws DdlException {
         List<Long> chosenBackendIds = 
Catalog.getCurrentSystemInfo().seqChooseBackendIds(replicationNum, true, true, 
clusterName);
         if (chosenBackendIds == null) {
@@ -5003,12 +5021,9 @@ public class Catalog {
         }
 
         // check if rollup has same name
-        if (table.getType() == TableType.OLAP) {
-            OlapTable olapTable = (OlapTable) table;
-            for (String idxName: olapTable.getIndexNameToId().keySet()) {
-                if (idxName.equals(newTableName)) {
-                    throw new DdlException("New name conflicts with rollup 
index name: " + idxName);
-                }
+        for (String idxName : table.getIndexNameToId().keySet()) {
+            if (idxName.equals(newTableName)) {
+                throw new DdlException("New name conflicts with rollup index 
name: " + idxName);
             }
         }
 
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java 
b/fe/src/main/java/org/apache/doris/common/Config.java
index 075bec5..7164a9f 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -21,12 +21,12 @@ import org.apache.doris.PaloFe;
 
 public class Config extends ConfigBase {
     
-    /*
+    /**
      * The max size of one sys log and audit log
      */
     @ConfField public static int log_roll_size_mb = 1024; // 1 GB
 
-    /*
+    /**
      * sys_log_dir:
      *      This specifies FE log dir. FE will produces 2 log files:
      *      fe.log:      all logs of FE process.
@@ -67,7 +67,7 @@ public class Config extends ConfigBase {
     @Deprecated
     @ConfField public static String sys_log_roll_mode = "SIZE-MB-1024";
 
-    /*
+    /**
      * audit_log_dir:
      *      This specifies FE audit log dir.
      *      Audit log fe.audit.log contains all requests with related infos 
such as user, host, cost, status, etc.
@@ -102,7 +102,7 @@ public class Config extends ConfigBase {
     @Deprecated
     @ConfField public static String audit_log_roll_mode = "TIME-DAY";
 
-    /*
+    /**
      * plugin_dir:
      *      plugin install directory
      */
@@ -111,7 +111,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean plugin_enable = false;
 
-    /*
+    /**
      * Labels of finished or cancelled load jobs will be removed after 
*label_keep_max_second*
      * The removed labels can be reused.
      * Set a short time will lower the FE memory usage.
@@ -119,24 +119,27 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int label_keep_max_second = 3 * 24 * 3600; // 3 days
-    /*
+  
+    /**
      * The max keep time of some kind of jobs.
      * like schema change job and rollup job.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days
-    /*
+  
+    /**
      * Load label cleaner will run every *label_clean_interval_second* to 
clean the outdated jobs.
      */
     @ConfField public static int label_clean_interval_second = 4 * 3600; // 4 
hours
-    /*
+  
+    /**
      * the transaction will be cleaned after transaction_clean_interval_second 
seconds if the transaction is visible or aborted
      * we should make this interval as short as possible and each clean cycle 
as soon as possible
      */
     @ConfField public static int transaction_clean_interval_second = 30;
 
     // Configurations for meta data durability
-    /*
+    /**
      * Doris meta data will be saved here.
      * The storage of this dir is highly recommended as to be:
      * 1. High write performance (SSD)
@@ -144,52 +147,58 @@ public class Config extends ConfigBase {
      */
     @ConfField public static String meta_dir = PaloFe.DORIS_HOME_DIR + 
"/doris-meta";
     
-    /*
+    /**
      * temp dir is used to save intermediate results of some process, such as 
backup and restore process.
      * file in this dir will be cleaned after these process is finished.
      */
     @ConfField public static String tmp_dir = PaloFe.DORIS_HOME_DIR + 
"/temp_dir";
     
-    /*
+    /**
      * Edit log type.
      * BDB: write log to bdbje
      * LOCAL: deprecated.
      */
     @ConfField
     public static String edit_log_type = "BDB";
-    /*
+  
+    /**
      * bdbje port
      */
     @ConfField
     public static int edit_log_port = 9010;
-    /*
+  
+    /**
      * Master FE will save image every *edit_log_roll_num* meta journals.
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int edit_log_roll_num = 50000;
-    /*
+    public static int edit_log_roll_num = 50000
+      
+    /**
      * Non-master FE will stop offering service
      * if meta data delay gap exceeds *meta_delay_toleration_second*
      */
     @ConfField public static int meta_delay_toleration_second = 300;    // 5 
min
-    /*
+  
+    /**
      * Master FE sync policy of bdbje.
      * If you only deploy one Follower FE, set this to 'SYNC'. If you deploy 
more than 3 Follower FE,
      * you can set this and the following 'replica_sync_policy' to 
WRITE_NO_SYNC.
      * more info, see: 
http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.SyncPolicy.html
      */
     @ConfField public static String master_sync_policy = "SYNC"; // SYNC, 
NO_SYNC, WRITE_NO_SYNC
-    /*
+  
+    /**
      * Follower FE sync policy of bdbje.
      */
     @ConfField public static String replica_sync_policy = "SYNC"; // SYNC, 
NO_SYNC, WRITE_NO_SYNC
-    /*
+  
+    /**
      * Replica ack policy of bdbje.
      * more info, see: 
http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.ReplicaAckPolicy.html
      */
     @ConfField public static String replica_ack_policy = "SIMPLE_MAJORITY"; // 
ALL, NONE, SIMPLE_MAJORITY
     
-    /*
+    /**
      * The heartbeat timeout of bdbje between master and follower.
      * the default is 30 seconds, which is same as default value in bdbje.
      * If the network is experiencing transient problems, of some unexpected 
long java GC annoying you,
@@ -197,37 +206,37 @@ public class Config extends ConfigBase {
      */
     @ConfField public static int bdbje_heartbeat_timeout_second = 30;
 
-    /*
+    /**
      * The lock timeout of bdbje operation
      * If there are many LockTimeoutException in FE WARN log, you can try to 
increase this value
      */
     @ConfField
     public static int bdbje_lock_timeout_second = 1;
 
-    /*
+    /**
      * num of thread to handle heartbeat events in heartbeat_mgr.
      */
     @ConfField(masterOnly = true)
     public static int heartbeat_mgr_threads_num = 8;
 
-     /*
+    /**
      * blocking queue size to store heartbeat task in heartbeat_mgr.
      */
     @ConfField(masterOnly = true)
     public static int heartbeat_mgr_blocking_queue_size = 1024;
 
-    /*
-    * max num of thread to handle agent task in agent task thread-pool.
-    */
+    /**
+     * max num of thread to handle agent task in agent task thread-pool.
+     */
     @ConfField(masterOnly = true)
     public static int max_agent_task_threads_num = 4096;
 
-    /*
+    /**
      * the max txn number which bdbje can rollback when trying to rejoin the 
group
      */
     @ConfField public static int txn_rollback_limit = 100;
 
-    /*
+    /**
      * Specified an IP for frontend, instead of the ip get by 
*InetAddress.getByName*.
      * This can be used when *InetAddress.getByName* get an unexpected IP 
address.
      * Default is "0.0.0.0", which means not set.
@@ -235,7 +244,7 @@ public class Config extends ConfigBase {
      */
     @ConfField public static String frontend_address = "0.0.0.0";
 
-    /*
+    /**
      * Declare a selection strategy for those servers have many ips.
      * Note that there should at most one ip match this list.
      * this is a list in semicolon-delimited format, in CIDR notation, e.g. 
10.10.10.0/24
@@ -243,7 +252,7 @@ public class Config extends ConfigBase {
      */
     @ConfField public static String priority_networks = "";
 
-    /*
+    /**
      * If true, FE will reset bdbje replication group(that is, to remove all 
electable nodes info)
      * and is supposed to start as Master.
      * If all the electable nodes can not start, we can copy the meta data
@@ -251,7 +260,7 @@ public class Config extends ConfigBase {
      */
     @ConfField public static String metadata_failure_recovery = "false";
 
-    /*
+    /**
      * If true, non-master FE will ignore the meta data delay gap between 
Master FE and its self,
      * even if the metadata delay gap exceeds *meta_delay_toleration_second*.
      * Non-master FE will still offer read service.
@@ -262,81 +271,84 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean ignore_meta_check = false;
 
-    /*
+    /**
      * Set the maximum acceptable clock skew between non-master FE to Master 
FE host.
      * This value is checked whenever a non-master FE establishes a connection 
to master FE via BDBJE.
      * The connection is abandoned if the clock skew is larger than this value.
      */
     @ConfField public static long max_bdbje_clock_delta_ms = 5000; // 5s
 
-    /*
+    /**
      * Fe http port
      * Currently, all FEs' http port must be same.
      */
     @ConfField public static int http_port = 8030;
 
-    /*
+    /**
      * The backlog_num for netty http server
      * When you enlarge this backlog_num, you should ensure it's value larger 
than
      * the linux /proc/sys/net/core/somaxconn config
      */
     @ConfField public static int http_backlog_num = 1024;
 
-    /*
+    /**
      * The connection timeout and socket timeout config for thrift server
      * The value for thrift_client_timeout_ms is set to be larger than zero to 
prevent
      * some hang up problems in java.net.SocketInputStream.socketRead0
      */
     @ConfField public static int thrift_client_timeout_ms = 30000;
 
-    /*
+    /**
      * The backlog_num for thrift server
      * When you enlarge this backlog_num, you should ensure it's value larger 
than
      * the linux /proc/sys/net/core/somaxconn config
      */
     @ConfField public static int thrift_backlog_num = 1024;
 
-    /*
+    /**
      * FE thrift server port
      */
     @ConfField public static int rpc_port = 9020;
-    /*
+  
+    /**
      * FE mysql server port
      */
     @ConfField public static int query_port = 9030;
 
-    /*
-    * mysql service nio option.
+    /**
+     * mysql service nio option.
      */
     @ConfField public static boolean mysql_service_nio_enabled = false;
 
-    /*
+    /**
      * num of thread to handle io events in mysql.
      */
     @ConfField public static int mysql_service_io_threads_num = 4;
 
-    /*
+    /**
      * max num of thread to handle task in mysql.
      */
     @ConfField public static int max_mysql_service_task_threads_num = 4096;
 
-    /*
+    /**
      * Cluster name will be shown as the title of web page
      */
     @ConfField public static String cluster_name = "Baidu Palo";
-    /*
+  
+    /**
      * node(FE or BE) will be considered belonging to the same Palo cluster if 
they have same cluster id.
      * Cluster id is usually a random integer generated when master FE start 
at first time.
      * You can also sepecify one.
      */
     @ConfField public static int cluster_id = -1;
-    /*
+  
+    /**
      * Cluster token used for internal authentication.
      */
     @ConfField public static String auth_token = "";
 
     // Configurations for load, clone, create table, alter table etc. We will 
rarely change them
-    /*
+    /**
      * Maximal waiting time for creating a single replica.
      * eg.
      *      if you create a table with #m tablets and #n replicas for each 
tablet,
@@ -344,30 +356,30 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int tablet_create_timeout_second = 1;
-    /*
+  
+    /**
      * In order not to wait too long for create table(index), set a max 
timeout.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_create_table_timeout_second = 60;
     
-    /*
+    /**
      * Maximal waiting time for all publish version tasks of one transaction 
to be finished
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int publish_version_timeout_second = 30; // 30 seconds
     
-    /*
+    /**
      * minimal intervals between two publish version action
      */
     @ConfField public static int publish_version_interval_ms = 10;
 
-
-    /*
+    /**
      * The thrift server max worker threads
      */
     @ConfField public static int thrift_server_max_worker_threads = 4096;
 
-    /*
+    /**
      * Maximal wait seconds for straggler node in load
      * eg.
      *      there are 3 replicas A, B, C
@@ -383,7 +395,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int load_straggler_wait_second = 300;
     
-    /*
+    /**
      * Maximal memory layout length of a row. default is 100 KB.
      * In BE, the maximal size of a RowBlock is 100MB(Configure as 
max_unpacked_row_block_size in be.conf).
      * And each RowBlock contains 1024 rows. So the maximal size of a row is 
approximately 100 KB.
@@ -400,7 +412,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_layout_length_per_row = 100000; // 100k
 
-    /*
+    /**
      * The load scheduler running interval.
      * A load job will transfer its state from PENDING to LOADING to FINISHED.
      * The load scheduler will transfer load job from PENDING to LOADING
@@ -409,7 +421,7 @@ public class Config extends ConfigBase {
      */
     @ConfField public static int load_checker_interval_second = 5;
 
-    /*
+    /**
      * Concurrency of HIGH priority pending load jobs.
      * Load job priority is defined as HIGH or NORMAL.
      * All mini batch load jobs are HIGH priority, other types of load jobs 
are NORMAL priority.
@@ -419,107 +431,106 @@ public class Config extends ConfigBase {
      * and do not change this if you know what you are doing.
      */
     @ConfField public static int load_pending_thread_num_high_priority = 3;
-    /*
+    /**
      * Concurrency of NORMAL priority pending load jobs.
      * Do not change this if you know what you are doing.
      */
     @ConfField public static int load_pending_thread_num_normal_priority = 10;
-    /*
+    /**
      * Concurrency of HIGH priority etl load jobs.
      * Do not change this if you know what you are doing.
      */
     @ConfField public static int load_etl_thread_num_high_priority = 3;
-    /*
+    /**
      * Concurrency of NORMAL priority etl load jobs.
      * Do not change this if you know what you are doing.
      */
     @ConfField public static int load_etl_thread_num_normal_priority = 10;
-    /*
+    /**
      * Concurrency of delete jobs.
      */
     @ConfField public static int delete_thread_num = 10;
-    /*
+    /**
      * Not available.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int load_input_size_limit_gb = 0; // GB, 0 is no limit
-    /*
+    /**
      * Not available.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int load_running_job_num_limit = 0; // 0 is no limit
-    /*
+    /**
      * Default broker load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int broker_load_default_timeout_second = 14400; // 4 hour
 
-    /*
+    /**
      * Default non-streaming mini load timeout
      */
     @Deprecated
     @ConfField(mutable = true, masterOnly = true)
     public static int mini_load_default_timeout_second = 3600; // 1 hour
     
-    /*
+    /**
      * Default insert load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int insert_load_default_timeout_second = 3600; // 1 hour
     
-    /*
+    /**
      * Default stream load and streaming mini load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int stream_load_default_timeout_second = 600; // 600s
 
-    /*
+    /**
      * Max load timeout applicable to all type of load except for stream load
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_load_timeout_second = 259200; // 3days
 
-    /*
+    /**
      * Max stream load and streaming mini load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_stream_load_timeout_second = 259200; // 3days
 
-    /*
-    * Min stream load timeout applicable to all type of load
-    */
+    /**
+     * Min stream load timeout applicable to all type of load
+     */
     @ConfField(mutable = true, masterOnly = true)
     public static int min_load_timeout_second = 1; // 1s
 
-    /*
+    /**
      * Default hadoop load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day
 
-    /*
+    /**
      * Default spark load timeout
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int spark_load_default_timeout_second = 86400; // 1 day
 
-    /*
+    /**
      * Default number of waiting jobs for routine load and version 2 of load
      * This is a desired number.
      * In some situation, such as switch the master, the current number is 
maybe more then desired_max_waiting_jobs
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int desired_max_waiting_jobs = 100;
-
-
-    /*
+  
+    /**
      * maximun concurrent running txn num including prepare, commit txns under 
a single db
      * txn manager will reject coming txns
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_running_txn_num_per_db = 100;
 
-    /*
+    /**
      * The load task executor pool size. This pool size limits the max running 
load tasks.
      * Currently, it only limits the load task of broker load, pending and 
loading phases.
      * It should be less than 'max_running_txn_num_per_db'
@@ -527,28 +538,28 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = true)
     public static int async_load_task_pool_size = 10;
 
-    /*
+    /**
      * Same meaning as *tablet_create_timeout_second*, but used when delete a 
tablet.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int tablet_delete_timeout_second = 2;
-    /*
+    /**
      * Clone checker's running interval.
      */
     @ConfField public static int clone_checker_interval_second = 300;
-    /*
+    /**
      * Default timeout of a single clone job. Set long enough to fit your 
replica size.
      * The larger the replica data size is, the more time is will cost to 
finish clone.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int clone_job_timeout_second = 7200; // 2h
-    /*
+    /**
      * Concurrency of LOW priority clone jobs.
      * Concurrency of High priority clone jobs is currently unlimit.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int clone_max_job_num = 100;
-    /*
+    /**
      * LOW priority clone job's delay trigger time.
      * A clone job contains a tablet which need to be cloned(recovery or 
migration).
      * If the priority is LOW, it will be delayed 
*clone_low_priority_delay_second*
@@ -560,22 +571,22 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int clone_low_priority_delay_second = 600;
-    /*
+    /**
      * NORMAL priority clone job's delay trigger time.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int clone_normal_priority_delay_second = 300;
-    /*
+    /**
      * HIGH priority clone job's delay trigger time.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int clone_high_priority_delay_second = 0;
-    /*
+    /**
      * the minimal delay seconds between a replica is failed and fe try to 
recovery it using clone.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int replica_delay_recovery_second = 0;
-    /*
+    /**
      * Balance threshold of data size in BE.
      * The balance algorithm is:
      * 1. Calculate the average used capacity(AUC) of the entire cluster. 
(total data size / total backends num)
@@ -585,83 +596,83 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static double clone_capacity_balance_threshold = 0.2;
-    /*
+    /**
      * Balance threshold of num of replicas in Backends.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static double clone_distribution_balance_threshold = 0.2;
-    /*
+    /**
      * The high water of disk capacity used percent.
      * This is used for calculating load score of a backend.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static double capacity_used_percent_high_water = 0.75;
-    /*
+    /**
      * Maximal timeout of ALTER TABLE request. Set long enough to fit your 
table data size.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int alter_table_timeout_second = 86400; // 1day
-    /*
+    /**
      * If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN 
event will be triggered.
      * Do not set this if you know what you are doing.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_backend_down_time_second = 3600; // 1h
-    /*
+    /**
      * When create a table(or partition), you can specify its storage 
medium(HDD or SSD).
      * If not set, this specifies the default medium when creat.
      */
     @ConfField public static String default_storage_medium = "HDD";
-    /*
+    /**
      * When create a table(or partition), you can specify its storage 
medium(HDD or SSD).
      * If set to SSD, this specifies the default duration that tablets will 
stay on SSD.
      * After that, tablets will be moved to HDD automatically.
      * You can set storage cooldown time in CREATE TABLE stmt.
      */
     @ConfField public static long storage_cooldown_second = 30 * 24 * 3600L; 
// 30 days
-    /*
+    /**
      * After dropping database(table/partition), you can recover it by using 
RECOVER stmt.
      * And this specifies the maximal data retention time. After time, the 
data will be deleted permanently.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long catalog_trash_expire_second = 86400L; // 1day
-    /*
+    /**
      * Maximal bytes that a single broker scanner will read.
      * Do not set this if you know what you are doing.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long min_bytes_per_broker_scanner = 67108864L; // 64MB
-    /*
+    /**
      * Maximal concurrency of broker scanners.
      * Do not set this if you know what you are doing.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_broker_concurrency = 10;
 
-    /*
+    /**
      * Export checker's running interval.
      */
     @ConfField public static int export_checker_interval_second = 5;
-    /*
+    /**
      * Limitation of the concurrency of running export jobs.
      * Default is 5.
      * 0 is unlimited
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int export_running_job_num_limit = 5;
-    /*
+    /**
      * Default timeout of export jobs.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int export_task_default_timeout_second = 2 * 3600; // 2h
-    /*
+    /**
      * Number of tablets per export query plan
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int export_tablet_num_per_task = 5;
 
     // Configurations for consistency check
-    /*
+    /**
      * Consistency checker will run from *consistency_check_start_time* to 
*consistency_check_end_time*.
      * Default is from 23:00 to 04:00
      */
@@ -669,49 +680,49 @@ public class Config extends ConfigBase {
     public static String consistency_check_start_time = "23";
     @ConfField(mutable = true, masterOnly = true)
     public static String consistency_check_end_time = "4";
-    /*
+    /**
      * Default timeout of a single consistency check task. Set long enough to 
fit your tablet size.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long check_consistency_default_timeout_second = 600; // 10 
min
 
     // Configurations for query engine
-    /*
+    /**
      * Maximal number of connections per FE.
      */
     @ConfField public static int qe_max_connection = 1024;
 
-    /*
+    /**
      * Maximal number of thread in connection-scheduler-pool.
      */
     @ConfField public static int max_connection_scheduler_threads_num = 4096;
 
-    /*
-    * The memory_limit for colocote join PlanFragment instance =
-    * exec_mem_limit / min (query_colocate_join_memory_limit_penalty_factor, 
instance_num)
-    */
+    /**
+     * The memory_limit for colocote join PlanFragment instance =
+     * exec_mem_limit / min (query_colocate_join_memory_limit_penalty_factor, 
instance_num)
+     */
     @ConfField(mutable = true)
     public static int query_colocate_join_memory_limit_penalty_factor = 8;
 
-    /*
+    /**
      * Deprecated after 0.10
      */
     @ConfField
     public static boolean disable_colocate_join = false;
-    /*
+    /**
      * The default user resource publishing timeout.
      */
     @ConfField public static int meta_publish_timeout_ms = 1000;
     @ConfField public static boolean proxy_auth_enable = false;
     @ConfField public static String proxy_auth_magic_prefix = "x@8";
-    /*
+    /**
      * Limit on the number of expr children of an expr tree.
      * Exceed this limit may cause long analysis time while holding database 
read lock.
      * Do not set this if you know what you are doing.
      */
     @ConfField(mutable = true)
     public static int expr_children_limit = 10000;
-    /*
+    /**
      * Limit on the depth of an expr tree.
      * Exceed this limit may cause long analysis time while holding db read 
lock.
      * Do not set this if you know what you are doing.
@@ -720,14 +731,14 @@ public class Config extends ConfigBase {
     public static int expr_depth_limit = 3000;
 
     // Configurations for backup and restore
-    /*
+    /**
      * Plugins' path for BACKUP and RESTORE operations. Currently deprecated.
      */
     @Deprecated
     @ConfField public static String backup_plugin_path = 
"/tools/trans_file_tool/trans_files.sh";
 
     // Configurations for hadoop dpp
-    /*
+    /**
      * The following configurations are not available.
      */
     @ConfField public static String dpp_hadoop_client_path = 
"/lib/hadoop-client/hadoop/bin/hadoop";
@@ -759,7 +770,7 @@ public class Config extends ConfigBase {
     // check token when download image file.
     @ConfField public static boolean enable_token_check = true;
 
-    /*
+    /**
      * Set to true if you deploy Palo using thirdparty deploy manager
      * Valid options are:
      *      disable:    no deploy manager
@@ -779,7 +790,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int backup_job_default_timeout_ms = 86400 * 1000; // 1 day
     
-    /*
+    /**
      * 'storage_high_watermark_usage_percent' limit the max capacity usage 
percent of a Backend storage path.
      * 'storage_min_left_capacity_bytes' limit the minimum left capacity of a 
Backend storage path.
      * If both limitations are reached, this storage path can not be chose as 
tablet balance destination.
@@ -790,7 +801,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long storage_min_left_capacity_bytes = 2 * 1024 * 1024 * 
1024; // 2G
 
-    /*
+    /**
      * If capacity of disk reach the 'storage_flood_stage_usage_percent' and 
'storage_flood_stage_left_capacity_bytes',
      * the following operation will be rejected:
      * 1. load job
@@ -812,26 +823,26 @@ public class Config extends ConfigBase {
     // BRPC idle wait time (ms)
     @ConfField public static int brpc_idle_wait_max_time = 10000;
     
-    /*
+    /**
      * if set to false, auth check will be disable, in case some goes wrong 
with the new privilege system. 
      */
     @ConfField public static boolean enable_auth_check = true;
     
-    /*
+    /**
      * Max bytes a broker scanner can process in one broker load job.
      * Commonly, each Backends has one broker scanner.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_bytes_per_broker_scanner = 3 * 1024 * 1024 * 1024L; 
// 3G
     
-    /*
+    /**
      * Max number of load jobs, include PENDING、ETL、LOADING、QUORUM_FINISHED.
      * If exceed this number, load job is not allowed to be submitted.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_unfinished_load_job = 1000;
     
-    /*
+    /**
      * If set to true, Planner will try to select replica of tablet on same 
host as this Frontend.
      * This may reduce network transmission in following case:
      * 1. N hosts with N Backends and N Frontends deployed.
@@ -842,7 +853,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean enable_local_replica_selection = false;
     
-    /*
+    /**
      * The timeout of executing async remote fragment.
      * In normal case, the async remote fragment will be executed in a short 
time. If system are under high load
      * condition,try to set this timeout longer.
@@ -850,7 +861,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec
     
-    /*
+    /**
      * The number of query retries. 
      * A query may retry if we encounter RPC exception and no result has been 
sent to user.
      * You may reduce this number to void Avalanche disaster.
@@ -858,14 +869,14 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int max_query_retry_time = 2;
 
-    /*
+    /**
      * The tryLock timeout configuration of catalog lock.
      * Normally it does not need to change, unless you need to test something.
      */
     @ConfField(mutable = true)
     public static long catalog_try_lock_timeout_ms = 5000; // 5 sec
     
-    /*
+    /**
      * if this is set to true
      *    all pending load job will failed when call begin txn api
      *    all prepare load job will failed when call commit txn api
@@ -874,20 +885,20 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_load_job = false;
     
-    /*
+    /**
      * Load using hadoop cluster will be deprecated in future.
      * Set to true to disable this kind of load.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_hadoop_load = false;
     
-    /*
+    /**
      * fe will call es api to get es index shard info every 
es_state_sync_interval_secs
      */
     @ConfField
     public static long es_state_sync_interval_second = 10;
     
-    /*
+    /**
      * the factor of delay time before deciding to repair tablet.
      * if priority is VERY_HIGH, repair it immediately.
      * HIGH, delay tablet_repair_delay_factor_second * 1;
@@ -897,18 +908,18 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long tablet_repair_delay_factor_second = 60;
     
-    /*
+    /**
      * the default slot number per path in tablet scheduler
      * TODO(cmy): remove this config and dynamically adjust it by clone task 
statistic
      */
     @ConfField public static int schedule_slot_num_per_path = 2;
     
-    /*
+    /**
      * Deprecated after 0.10
      */
     @ConfField public static boolean use_new_tablet_scheduler = true;
 
-    /*
+    /**
      * the threshold of cluster balance score, if a backend's load score is 
10% lower than average score,
      * this backend will be marked as LOW load, if load score is 10% higher 
than average score, HIGH load
      * will be marked.
@@ -916,7 +927,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static double balance_load_score_threshold = 0.1; // 10%
 
-    /*
+    /**
      * if set to true, TabletScheduler will not do balance.
      */
     @ConfField(mutable = true, masterOnly = true)
@@ -946,24 +957,24 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int report_queue_size = 100;
     
-    /*
+    /**
      * If set to true, metric collector will be run as a daemon timer to 
collect metrics at fix interval
      */
     @ConfField public static boolean enable_metric_calculator = true;
 
-    /*
+    /**
      * the max routine load job num, including NEED_SCHEDULED, RUNNING, PAUSE
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_job_num = 100;
 
-    /*
+    /**
      * the max concurrent routine load task num of a single routine load job
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_concurrent_num = 5;
 
-    /*
+    /**
      * the max concurrent routine load task num per BE.
      * This is to limit the num of routine load tasks sending to a BE, and it 
should also less
      * than BE config 'routine_load_thread_pool_size'(default 10),
@@ -972,24 +983,24 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int max_routine_load_task_num_per_be = 5;
 
-    /*
+    /**
      * The max number of files store in SmallFileMgr 
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_small_file_number = 100;
 
-    /*
+    /**
      * The max size of a single file store in SmallFileMgr 
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_small_file_size_bytes = 1024 * 1024; // 1MB
 
-    /*
+    /**
      * Save small files
      */
     @ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + 
"/small_files";
     
-    /*
+    /**
      * The following 2 configs can set to true to disable the automatic 
colocate tables's relocate and balance.
      * if 'disable_colocate_relocate' is set to true, ColocateTableBalancer 
will not relocate colocate tables when Backend unavailable.
      * if 'disable_colocate_balance' is set to true, ColocateTableBalancer 
will not balance colocate tables.
@@ -997,7 +1008,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true) public static boolean 
disable_colocate_relocate = false;
     @ConfField(mutable = true, masterOnly = true) public static boolean 
disable_colocate_balance = false;
 
-    /*
+    /**
      * If set to true, the insert stmt with processing error will still return 
a label to user.
      * And user can use this label to check the load job's status.
      * The default value is false, which means if insert operation encounter 
errors,
@@ -1005,7 +1016,7 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true) public static boolean 
using_old_load_usage_pattern = false;
 
-    /*
+    /**
      * This will limit the max recursion depth of hash distribution pruner.
      * eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) 
and d in (2 elements).
      * a/b/c/d are distribution columns, so the recursion depth will be 5 * 4 
* 3 * 2 = 120, larger than 100,
@@ -1016,20 +1027,20 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = false)
     public static int max_distribution_pruner_recursion_depth = 100;
 
-    /*
+    /**
      * If the jvm memory used percent(heap or old mem pool) exceed this 
threshold, checkpoint thread will
      * not work to avoid OOM.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long metadata_checkopoint_memory_threshold = 60;
 
-    /*
+    /**
      * If set to true, the checkpoint thread will make the checkpoint 
regardless of the jvm memory used percent.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean force_do_metadata_checkpoint = false;
 
-    /*
+    /**
      * The multi cluster feature will be deprecated in version 0.12
      * set this config to true will disable all operations related to cluster 
feature, include:
      *   create/drop cluster
@@ -1040,31 +1051,31 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean disable_cluster_feature = true;
 
-    /*
+    /**
      * Decide how often to check dynamic partition
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long dynamic_partition_check_interval_seconds = 600;
 
-    /*
+    /**
      * If set to true, dynamic partition feature will open
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean dynamic_partition_enable = false;
 
-    /*
+    /**
      * control rollup job concurrent limit
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_running_rollup_job_num_per_table = 1;
 
-    /*
+    /**
      * If set to true, Doris will check if the compiled and running versions 
of Java are compatible
      */
     @ConfField
     public static boolean check_java_version = true;
 
-    /*
+    /**
      * control materialized view
      */
     @ConfField(mutable = true, masterOnly = true)
@@ -1082,17 +1093,24 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int period_of_auto_resume_min = 5;
 
-    /*
+    /**
      * If set to true, the backend will be automatically dropped after 
finishing decommission.
      * If set to false, the backend will not be dropped and remaining in 
DECOMMISSION state.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean drop_backend_after_decommission = true;
 
-    /*
+    /**
+     * If set to true, FE will check backend available capacity by storage 
medium when create table
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_strict_storage_medium_check = false;
+
+    /**
      * enable spark load for temporary use
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean enable_spark_load = false;
+
 }
 
diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java 
b/fe/src/main/java/org/apache/doris/system/Backend.java
index 83d4514..2144c84 100644
--- a/fe/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/src/main/java/org/apache/doris/system/Backend.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.doris.thrift.TStorageMedium;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -303,7 +304,7 @@ public class Backend implements Writable {
     }
 
     public boolean hasPathHash() {
-        return disksRef.get().values().stream().allMatch(v -> v.hasPathHash());
+        return 
disksRef.get().values().stream().allMatch(DiskInfo::hasPathHash);
     }
 
     public long getTotalCapacityB() {
@@ -354,6 +355,36 @@ public class Backend implements Writable {
         return maxPct;
     }
 
+    public boolean diskExceedLimitByStorageMedium(TStorageMedium 
storageMedium) {
+        if (getDiskNumByStorageMedium(storageMedium) <= 0) {
+            return true;
+        }
+        ImmutableMap<String, DiskInfo> diskInfos = disksRef.get();
+        boolean exceedLimit = true;
+        for (DiskInfo diskInfo : diskInfos.values()) {
+            if (diskInfo.getState() == DiskState.ONLINE && 
diskInfo.getStorageMedium() == storageMedium && !diskInfo.exceedLimit(true)) {
+                exceedLimit = false;
+                break;
+            }
+        }
+        return exceedLimit;
+    }
+
+    public boolean diskExceedLimit() {
+        if (getDiskNum() <= 0) {
+            return true;
+        }
+        ImmutableMap<String, DiskInfo> diskInfos = disksRef.get();
+        boolean exceedLimit = true;
+        for (DiskInfo diskInfo : diskInfos.values()) {
+            if (diskInfo.getState() == DiskState.ONLINE && 
!diskInfo.exceedLimit(true)) {
+                exceedLimit = false;
+                break;
+            }
+        }
+        return exceedLimit;
+    }
+
     public String getPathByPathHash(long pathHash) {
         for (DiskInfo diskInfo : disksRef.get().values()) {
             if (diskInfo.getPathHash() == pathHash) {
@@ -640,6 +671,14 @@ public class Backend implements Writable {
         return tabletMaxCompactionScore.get();
     }
 
+    private long getDiskNumByStorageMedium(TStorageMedium storageMedium) {
+        return disksRef.get().values().stream().filter(v -> 
v.getStorageMedium() == storageMedium).count();
+    }
+
+    private int getDiskNum() {
+        return disksRef.get().size();
+    }
+
     /**
      * Note: This class must be a POJO in order to display in JSON format
      * Add additional information in the class to show in `show backends`
diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
index 2619e5b..844da0b 100644
--- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.doris.thrift.TStorageMedium;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -58,6 +59,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 public class SystemInfoService {
     private static final Logger LOG = 
LogManager.getLogger(SystemInfoService.class);
@@ -421,6 +423,7 @@ public class SystemInfoService {
      * @param shrinkNum
      * @return
      */
+    @Deprecated
     public List<Long> calculateDecommissionBackends(String clusterName, int 
shrinkNum) {
         LOG.info("calculate decommission backend in cluster: {}. decommission 
num: {}", clusterName, shrinkNum);
 
@@ -725,12 +728,24 @@ public class SystemInfoService {
         return classMap;
     }
 
+    public List<Long> seqChooseBackendIdsByStorageMedium(int backendNum, 
boolean needAlive, boolean isCreate,
+                                                                      String 
clusterName, TStorageMedium storageMedium) {
+        final List<Backend> backends = 
getClusterBackends(clusterName).stream().filter(v -> 
!v.diskExceedLimitByStorageMedium(storageMedium)).collect(Collectors.toList());
+        return seqChooseBackendIds(backendNum, needAlive, isCreate, 
clusterName, backends);
+    }
+
+    public List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, 
boolean isCreate,
+                                                       String clusterName) {
+        final List<Backend> backends = 
getClusterBackends(clusterName).stream().filter(v -> 
!v.diskExceedLimit()).collect(Collectors.toList());
+        return seqChooseBackendIds(backendNum, needAlive, isCreate, 
clusterName, backends);
+    }
+
     // choose backends by round robin
     // return null if not enough backend
     // use synchronized to run serially
     public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean 
needAlive, boolean isCreate,
-            String clusterName) {
-        long lastBackendId = -1L;
+                                                       String clusterName, 
final List<Backend> srcBackends) {
+        long lastBackendId;
 
         if (clusterName.equals(DEFAULT_CLUSTER)) {
             if (isCreate) {
@@ -756,8 +771,6 @@ public class SystemInfoService {
             }
         }
 
-        // put backend with same host in same list
-        final List<Backend> srcBackends = getClusterBackends(clusterName);
         // host -> BE list
         Map<String, List<Backend>> backendMaps = Maps.newHashMap();
         for (Backend backend : srcBackends) {
diff --git a/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java 
b/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
index 6210e30..483832e 100644
--- a/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
@@ -17,205 +17,94 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.ColumnDef;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.HashDistributionDesc;
-import org.apache.doris.analysis.KeysDesc;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.analysis.DropDbStmt;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
-import org.apache.doris.cluster.Cluster;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.jmockit.Deencapsulation;
-import org.apache.doris.common.util.PropertyAnalyzer;
-import org.apache.doris.mysql.privilege.PaloAuth;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentBatchTask;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
+import org.apache.doris.utframe.UtFrameUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.HashMap;
+import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
+import java.util.UUID;
 
 public class ColocateTableTest {
-    private TableName dbTableName1;
-    private TableName dbTableName2;
-    private TableName dbTableName3;
-    private String dbName = "default:testDb";
-    private String groupName1 = "group1";
-    private String tableName1 = "t1";
-    private String tableName2 = "t2";
-    private String tableName3 = "t3";
-    private String clusterName = "default";
-    private List<Long> beIds = Lists.newArrayList();
-    private List<String> columnNames = Lists.newArrayList();
-    private List<ColumnDef> columnDefs = Lists.newArrayList();
-    private Map<String, String> properties = new HashMap<String, String>();
-
-    private Catalog catalog;
-    private Database db;
-    private Analyzer analyzer;
-
-    @Injectable
-    private ConnectContext connectContext;
-    @Injectable
-    private SystemInfoService systemInfoService;
-    @Injectable
-    private PaloAuth paloAuth;
-    @Injectable
-    private EditLog editLog;
+    private static String runningDir = "fe/mocked/ColocateTableTest" + 
UUID.randomUUID().toString() + "/";
+
+    private static ConnectContext connectContext;
+    private static String dbName = "testDb";
+    private static String fullDbName = "default_cluster:" + dbName;
+    private static String tableName1 = "t1";
+    private static String tableName2 = "t2";
+    private static String groupName = "group1";
 
     @Rule
     public ExpectedException expectedEx = ExpectedException.none();
 
-    @Before
-    public void setUp() throws Exception {
-        dbTableName1 = new TableName(dbName, tableName1);
-        dbTableName2 = new TableName(dbName, tableName2);
-        dbTableName3 = new TableName(dbName, tableName3);
-
-        beIds.clear();
-        beIds.add(1L);
-        beIds.add(2L);
-        beIds.add(3L);
-
-        columnNames.clear();
-        columnNames.add("key1");
-        columnNames.add("key2");
-
-        columnDefs.clear();
-        columnDefs.add(new ColumnDef("key1", new 
TypeDef(ScalarType.createType(PrimitiveType.INT))));
-        columnDefs.add(new ColumnDef("key2", new 
TypeDef(ScalarType.createVarchar(10))));
-
-        catalog = Deencapsulation.newInstance(Catalog.class);
-        analyzer = new Analyzer(catalog, connectContext);
-
-        new Expectations(analyzer) {
-            {
-                analyzer.getClusterName();
-                result = clusterName;
-            }
-        };
-
-        dbTableName1.analyze(analyzer);
-        dbTableName2.analyze(analyzer);
-        dbTableName3.analyze(analyzer);
-
-        Config.disable_colocate_join = false;
-
-        new Expectations(catalog) {
-            {
-                Catalog.getCurrentCatalog();
-                result = catalog;
-
-                Catalog.getCurrentCatalog();
-                result = catalog;
-
-                Catalog.getCurrentSystemInfo();
-                result = systemInfoService;
-
-                systemInfoService.checkClusterCapacity(anyString);
-                systemInfoService.seqChooseBackendIds(anyInt, true, true, 
anyString);
-                result = beIds;
-
-                catalog.getAuth();
-                result = paloAuth;
-                paloAuth.checkTblPriv((ConnectContext) any, anyString, 
anyString, PrivPredicate.CREATE);
-                result = true;
-                paloAuth.checkTblPriv((ConnectContext) any, anyString, 
anyString, PrivPredicate.DROP);
-                result = true; minTimes = 0; maxTimes = 1;
-            }
-        };
-
-        new Expectations() {
-            {
-                Deencapsulation.setField(catalog, "editLog", editLog);
-            }
-        };
-
-        initDatabase();
-        db = catalog.getDb(dbName);
-
-        new MockUp<AgentBatchTask>() {
-            @Mock
-            void run() {
-                return;
-            }
-        };
-
-        new MockUp<CountDownLatch>() {
-            @Mock
-            boolean await(long timeout, TimeUnit unit) {
-                return true;
-            }
-        };
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        UtFrameUtils.createMinDorisCluster(runningDir);
+        connectContext = UtFrameUtils.createDefaultCtx();
+
     }
 
-    private void initDatabase() throws Exception {
-        CreateDbStmt dbStmt = new CreateDbStmt(true, dbName);
-        new Expectations(dbStmt) {
-            {
-                dbStmt.getClusterName();
-                result = clusterName;
-            }
-        };
-
-        ConcurrentHashMap<String, Cluster> nameToCluster =  new 
ConcurrentHashMap<>();
-        nameToCluster.put(clusterName, new Cluster(clusterName, 1));
-        new Expectations() {
-            {
-                Deencapsulation.setField(catalog, "nameToCluster", 
nameToCluster);
-            }
-        };
-
-        catalog.createDb(dbStmt);
+    @AfterClass
+    public static void tearDown() {
+            File file = new File(runningDir);
+            file.delete();
     }
 
-    @After
-    public void tearDown() throws Exception {
-        catalog.clear();
+    @Before
+    public void createDb() throws Exception {
+        String createDbStmtStr = "create database " + dbName;
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        Catalog.getCurrentCatalog().createDb(createDbStmt);
+        Catalog.getCurrentCatalog().setColocateTableIndex(new 
ColocateTableIndex());
     }
 
-    private void createOneTable(int numBucket, Map<String, String> properties) 
throws Exception {
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
+    @After
+    public void dropDb() throws Exception {
+        String dropDbStmtStr = "drop database " + dbName;
+        DropDbStmt dropDbStmt = (DropDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(dropDbStmtStr, connectContext);
+        Catalog.getCurrentCatalog().dropDb(dropDbStmt);
+    }
 
-        CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName1, 
columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(numBucket, 
Lists.newArrayList("key1")), properties, null, "");
-        stmt.analyze(analyzer);
-        catalog.createTable(stmt);
+    private static void createTable(String sql) throws Exception {
+        CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().createTable(createTableStmt);
     }
 
     @Test
     public void testCreateOneTable() throws Exception {
-        int numBucket = 1;
-
-        createOneTable(numBucket, properties);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         ColocateTableIndex index = Catalog.getCurrentColocateIndex();
+        Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
         long tableId = db.getTable(tableName1).getId();
 
         Assert.assertEquals(1, Deencapsulation.<Multimap<GroupId, 
Long>>getField(index, "group2Tables").size());
@@ -232,35 +121,49 @@ public class ColocateTableTest {
 
         GroupId groupId = index.getGroup(tableId);
         List<Long> backendIds = index.getBackendsPerBucketSeq(groupId).get(0);
-        Assert.assertEquals(beIds, backendIds);
+        System.out.println(backendIds);
+        Assert.assertEquals(Collections.singletonList(10001L), backendIds);
 
-        String fullGroupName = dbId + "_" + groupName1;
+        String fullGroupName = dbId + "_" + groupName;
         Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName));
         ColocateGroupSchema groupSchema = index.getGroupSchema(fullGroupName);
         Assert.assertNotNull(groupSchema);
         Assert.assertEquals(dbId, groupSchema.getGroupId().dbId);
-        Assert.assertEquals(numBucket, groupSchema.getBucketsNum());
-        Assert.assertEquals(3, groupSchema.getReplicationNum());
+        Assert.assertEquals(1, groupSchema.getBucketsNum());
+        Assert.assertEquals(1, groupSchema.getReplicationNum());
     }
 
     @Test
     public void testCreateTwoTableWithSameGroup() throws Exception {
-        int numBucket = 1;
-
-        createOneTable(numBucket, properties);
-
-        // create second table
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
-        CreateTableStmt secondStmt = new CreateTableStmt(false, false, 
dbTableName2, columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(numBucket, 
Lists.newArrayList("key1")), properties, null, "");
-        secondStmt.analyze(analyzer);
-        catalog.createTable(secondStmt);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
+
+        createTable("create table " + dbName + "." + tableName2 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         ColocateTableIndex index = Catalog.getCurrentColocateIndex();
+        Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
         long firstTblId = db.getTable(tableName1).getId();
         long secondTblId = db.getTable(tableName2).getId();
-        
+
         Assert.assertEquals(2, Deencapsulation.<Multimap<GroupId, 
Long>>getField(index, "group2Tables").size());
         Assert.assertEquals(1, index.getAllGroupIds().size());
         Assert.assertEquals(2, Deencapsulation.<Map<Long, 
GroupId>>getField(index, "table2Group").size());
@@ -301,74 +204,118 @@ public class ColocateTableTest {
 
     @Test
     public void testBucketNum() throws Exception {
-        int firstBucketNum = 1;
-        createOneTable(firstBucketNum, properties);
-
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
-        int secondBucketNum = 2;
-        CreateTableStmt secondStmt = new CreateTableStmt(false, false, 
dbTableName2, columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(secondBucketNum, 
Lists.newArrayList("key1")), properties, null, "");
-        secondStmt.analyze(analyzer);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         expectedEx.expect(DdlException.class);
         expectedEx.expectMessage("Colocate tables must have same bucket num: 
1");
+        createTable("create table " + dbName + "." + tableName2 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 2\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
-        catalog.createTable(secondStmt);
     }
 
     @Test
     public void testReplicationNum() throws Exception {
-        int bucketNum = 1;
-
-        createOneTable(bucketNum, properties);
-
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
-        properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, "2");
-        CreateTableStmt secondStmt = new CreateTableStmt(false, false, 
dbTableName2, columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(bucketNum, 
Lists.newArrayList("key1")), properties, null, "");
-        secondStmt.analyze(analyzer);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         expectedEx.expect(DdlException.class);
-        expectedEx.expectMessage("Colocate tables must have same replication 
num: 3");
-
-        catalog.createTable(secondStmt);
+        expectedEx.expectMessage("Colocate tables must have same replication 
num: 1");
+        createTable("create table " + dbName + "." + tableName2 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"2\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
     }
 
     @Test
     public void testDistributionColumnsSize() throws Exception {
-        int bucketNum = 1;
-        createOneTable(bucketNum, properties);
-
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
-        CreateTableStmt childStmt = new CreateTableStmt(false, false, 
dbTableName2, columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(bucketNum, Lists.newArrayList("key1", 
"key2")), properties, null, "");
-        childStmt.analyze(analyzer);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         expectedEx.expect(DdlException.class);
-        expectedEx.expectMessage("Colocate tables distribution columns size 
must be same : 1");
-
-        catalog.createTable(childStmt);
+        expectedEx.expectMessage("Colocate tables distribution columns size 
must be same : 2");
+        createTable("create table " + dbName + "." + tableName2 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
     }
 
     @Test
     public void testDistributionColumnsType() throws Exception {
-        int bucketNum = 1;
-
-        createOneTable(bucketNum, properties);
-
-        properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
-        CreateTableStmt childStmt = new CreateTableStmt(false, false, 
dbTableName2, columnDefs, "olap",
-                new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
-                new HashDistributionDesc(bucketNum, 
Lists.newArrayList("key2")), properties, null, "");
-        childStmt.analyze(analyzer);
+        createTable("create table " + dbName + "." + tableName1 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` int NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
 
         expectedEx.expect(DdlException.class);
-        expectedEx.expectMessage(
-                "Colocate tables distribution columns must have the same data 
type: key2 should be INT");
-
-        catalog.createTable(childStmt);
+        expectedEx.expectMessage("Colocate tables distribution columns must 
have the same data type: k2 should be INT");
+        createTable("create table " + dbName + "." + tableName2 + " (\n" +
+                " `k1` int NULL COMMENT \"\",\n" +
+                " `k2` varchar(10) NULL COMMENT \"\"\n" +
+                ") ENGINE=OLAP\n" +
+                "DUPLICATE KEY(`k1`, `k2`)\n" +
+                "COMMENT \"OLAP\"\n" +
+                "DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
+                "PROPERTIES (\n" +
+                " \"replication_num\" = \"1\",\n" +
+                " \"colocate_with\" = \"" + groupName + "\"\n" +
+                ");");
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/catalog/CreateTableTest.java 
b/fe/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 4ee95a5..df61e6a 100644
--- a/fe/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ConfigBase;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.qe.ConnectContext;
@@ -49,7 +50,7 @@ public class CreateTableTest {
         CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
         Catalog.getCurrentCatalog().createDb(createDbStmt);
     }
-    
+
     @AfterClass
     public static void tearDown() {
         File file = new File(runningDir);
@@ -62,7 +63,7 @@ public class CreateTableTest {
     }
 
     @Test
-    public void testNormal() {
+    public void testNormal() throws DdlException {
         ExceptionChecker.expectThrowsNoException(
                 () -> createTable("create table test.tbl1\n" + "(k1 int, k2 
int)\n" + "duplicate key(k1)\n"
                         + "distributed by hash(k2) buckets 1\n" + 
"properties('replication_num' = '1'); "));
@@ -96,6 +97,11 @@ public class CreateTableTest {
                         + "partition by range(k2)\n" + "(partition p1 values 
less than(\"10\"))\n"
                         + "distributed by hash(k2) buckets 1\n" + 
"properties('replication_num' = '1');"));
 
+        ConfigBase.setMutableConfig("enable_strict_storage_medium_check", 
"false");
+        ExceptionChecker
+                .expectThrowsNoException(() -> createTable("create table 
test.tb7(key1 int, key2 varchar(10)) \n"
+                        + "distributed by hash(key1) buckets 1 
properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
+
         Database db = 
Catalog.getCurrentCatalog().getDb("default_cluster:test");
         OlapTable tbl6 = (OlapTable) db.getTable("tbl6");
         Assert.assertTrue(tbl6.getColumn("k1").isKey());
@@ -109,7 +115,7 @@ public class CreateTableTest {
     }
 
     @Test
-    public void testAbormal() {
+    public void testAbormal() throws DdlException {
         ExceptionChecker.expectThrowsWithMsg(DdlException.class,
                 "Floating point type column can not be distribution column",
                 () -> createTable("create table test.atbl1\n" + "(k1 int, k2 
float)\n" + "duplicate key(k1)\n"
@@ -147,5 +153,11 @@ public class CreateTableTest {
                         () -> createTable("create table test.atbl6\n" + "(k1 
int, k2 int, k3 int)\n"
                                 + "duplicate key(k1, k2, k3)\n" + "distributed 
by hash(k1) buckets 1\n"
                                 + "properties('replication_num' = '1');"));
+
+        ConfigBase.setMutableConfig("enable_strict_storage_medium_check", 
"true");
+        ExceptionChecker
+                .expectThrowsWithMsg(DdlException.class, "Failed to find 
enough host with storage medium is SSD in all backends. need: 1",
+                        () -> createTable("create table test.tb7(key1 int, 
key2 varchar(10)) distributed by hash(key1) \n"
+                                + "buckets 1 properties('replication_num' = 
'1', 'storage_medium' = 'ssd');"));
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java 
b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java
index 01d7aff..0a91f40 100644
--- a/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java
+++ b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.doris.utframe;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
@@ -30,6 +32,8 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TNetworkAddress;
 import 
org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl;
 import 
org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl;
@@ -107,12 +111,20 @@ public class AnotherDemoTest {
         backend.start();
 
         // add be
-        List<Pair<String, Integer>> bes = Lists.newArrayList();
-        bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
-        Catalog.getCurrentSystemInfo().addBackends(bes, false, 
"default_cluster");
+        Backend be = new Backend(10001, backend.getHost(), 
backend.getHeartbeatPort());
+        Map<String, DiskInfo> disks = Maps.newHashMap();
+        DiskInfo diskInfo1 = new DiskInfo("/path1");
+        diskInfo1.setTotalCapacityB(1000000);
+        diskInfo1.setAvailableCapacityB(500000);
+        diskInfo1.setDataUsedCapacityB(480000);
+        disks.put(diskInfo1.getRootPath(), diskInfo1);
+        be.setDisks(ImmutableMap.copyOf(disks));
+        be.setAlive(true);
+        be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        Catalog.getCurrentSystemInfo().addBackend(be);
 
         // sleep to wait first heartbeat
-        Thread.sleep(5000);
+        Thread.sleep(6000);
     }
 
     @AfterClass
diff --git a/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index ac538e7..a0905e8 100644
--- a/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -17,12 +17,14 @@
 
 package org.apache.doris.utframe;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -33,6 +35,7 @@ import org.apache.doris.planner.Planner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -55,6 +58,7 @@ import java.io.StringReader;
 import java.net.ServerSocket;
 import java.nio.channels.SocketChannel;
 import java.nio.file.Files;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -166,9 +170,17 @@ public class UtFrameUtils {
         backend.start();
 
         // add be
-        List<Pair<String, Integer>> bes = Lists.newArrayList();
-        bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
-        Catalog.getCurrentSystemInfo().addBackends(bes, false, 
"default_cluster");
+        Backend be = new Backend(10001, backend.getHost(), 
backend.getHeartbeatPort());
+        Map<String, DiskInfo> disks = Maps.newHashMap();
+        DiskInfo diskInfo1 = new DiskInfo("/path1");
+        diskInfo1.setTotalCapacityB(1000000);
+        diskInfo1.setAvailableCapacityB(500000);
+        diskInfo1.setDataUsedCapacityB(480000);
+        disks.put(diskInfo1.getRootPath(), diskInfo1);
+        be.setDisks(ImmutableMap.copyOf(disks));
+        be.setAlive(true);
+        be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        Catalog.getCurrentSystemInfo().addBackend(be);
 
         // sleep to wait first heartbeat
         Thread.sleep(6000);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to