This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9ebe3cdb8ba74eae5106ee753197b434be400d47
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Sun Sep 10 18:29:54 2023 +0800

    [Enhancement](multi-catalog) merge hms partition events. (#22869)
    
    This pr mainly has two changes:
    
    1. add some merge processes about partition events
    2. add a ut for `MetastoreEventFactory`. First add some mock classes 
(`MockCatalog`/`MockDatabase` ...) to simulate the real hms 
catalog/databases/tables/partitions,  then create a event producer which can 
produce every kinds of `MetastoreEvent`s randomly. Use two catalogs for test, 
one is named `testCatalog` and the other is the `validateCatalog`, use event 
producer to produce many events and let `validateCatalog` to handle all of the 
events, but `testCatalog` just handles the events [...]
---
 .../datasource/hive/event/AddPartitionEvent.java   |  18 +-
 .../datasource/hive/event/AlterDatabaseEvent.java  |  10 +-
 .../datasource/hive/event/AlterPartitionEvent.java |  58 ++-
 .../datasource/hive/event/AlterTableEvent.java     |  41 +-
 .../datasource/hive/event/CreateDatabaseEvent.java |   2 +-
 .../datasource/hive/event/CreateTableEvent.java    |   7 +-
 .../datasource/hive/event/DropDatabaseEvent.java   |   5 +
 .../datasource/hive/event/DropPartitionEvent.java  |  52 +-
 .../datasource/hive/event/DropTableEvent.java      |  23 +-
 .../doris/datasource/hive/event/InsertEvent.java   |  16 +-
 .../datasource/hive/event/MetastoreEvent.java      |   6 +-
 .../hive/event/MetastoreEventFactory.java          |   8 +-
 .../hive/event/MetastorePartitionEvent.java        |  18 +-
 .../datasource/hive/event/MetastoreTableEvent.java |  10 +-
 .../external/hms/MetastoreEventFactoryTest.java    | 577 +++++++++++++++------
 15 files changed, 653 insertions(+), 198 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
index b94333e41b..8872c9b596 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -32,6 +33,7 @@ import 
org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -44,7 +46,7 @@ public class AddPartitionEvent extends 
MetastorePartitionEvent {
     // for test
     public AddPartitionEvent(long eventId, String catalogName, String dbName,
                              String tblName, List<String> partitionNames) {
-        super(eventId, catalogName, dbName, tblName);
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.ADD_PARTITION);
         this.partitionNames = partitionNames;
         this.hmsTbl = null;
     }
@@ -71,6 +73,20 @@ public class AddPartitionEvent extends 
MetastorePartitionEvent {
         }
     }
 
+    @Override
+    protected boolean willChangePartitionName() {
+        return false;
+    }
+
+    @Override
+    public Set<String> getAllPartitionNames() {
+        return ImmutableSet.copyOf(partitionNames);
+    }
+
+    public void removePartition(String partitionName) {
+        partitionNames.remove(partitionName);
+    }
+
     protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
         return Lists.newArrayList(new AddPartitionEvent(event, catalogName));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
index d56eb52fad..fd31701f71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import 
org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
 
 import java.util.List;
+import java.util.Random;
 
 /**
  * MetastoreEvent for ALTER_DATABASE event type
@@ -41,13 +42,15 @@ public class AlterDatabaseEvent extends MetastoreEvent {
 
     // true if this alter event was due to a rename operation
     private final boolean isRename;
+    private final String dbNameAfter;
 
     // for test
     public AlterDatabaseEvent(long eventId, String catalogName, String dbName, 
boolean isRename) {
-        super(eventId, catalogName, dbName, null);
+        super(eventId, catalogName, dbName, null, 
MetastoreEventType.ALTER_DATABASE);
         this.isRename = isRename;
         this.dbBefore = null;
         this.dbAfter = null;
+        this.dbNameAfter = isRename ? (dbName + new Random().nextInt(10)) : 
dbName;
     }
 
     private AlterDatabaseEvent(NotificationEvent event,
@@ -61,6 +64,7 @@ public class AlterDatabaseEvent extends MetastoreEvent {
                                 .getAlterDatabaseMessage(event.getMessage());
             dbBefore = 
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
             dbAfter = 
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
+            dbNameAfter = dbAfter.getName();
         } catch (Exception e) {
             throw new MetastoreNotificationException(
                     debugString("Unable to parse the alter database message"), 
e);
@@ -97,6 +101,10 @@ public class AlterDatabaseEvent extends MetastoreEvent {
         return isRename;
     }
 
+    public String getDbNameAfter() {
+        return dbNameAfter;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
index 0fc6be375d..eb278fe479 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -30,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 
 import java.util.List;
-import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -47,14 +49,14 @@ public class AlterPartitionEvent extends 
MetastorePartitionEvent {
 
     // for test
     public AlterPartitionEvent(long eventId, String catalogName, String 
dbName, String tblName,
-                                String partitionNameBefore, String 
partitionNameAfter) {
-        super(eventId, catalogName, dbName, tblName);
+                                String partitionNameBefore, boolean isRename) {
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.ALTER_PARTITION);
         this.partitionNameBefore = partitionNameBefore;
-        this.partitionNameAfter = partitionNameAfter;
+        this.partitionNameAfter = isRename ? (partitionNameBefore + new 
Random().nextInt(100)) : partitionNameBefore;
         this.hmsTbl = null;
         this.partitionAfter = null;
         this.partitionBefore = null;
-        isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
+        this.isRename = isRename;
     }
 
     private AlterPartitionEvent(NotificationEvent event,
@@ -80,6 +82,24 @@ public class AlterPartitionEvent extends 
MetastorePartitionEvent {
         }
     }
 
+    @Override
+    protected boolean willChangePartitionName() {
+        return isRename;
+    }
+
+    @Override
+    public Set<String> getAllPartitionNames() {
+        return ImmutableSet.of(partitionNameBefore);
+    }
+
+    public String getPartitionNameAfter() {
+        return partitionNameAfter;
+    }
+
+    public boolean isRename() {
+        return isRename;
+    }
+
     protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
         return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
@@ -109,10 +129,28 @@ public class AlterPartitionEvent extends 
MetastorePartitionEvent {
     }
 
     @Override
-    protected boolean canBeBatched(MetastoreEvent event) {
-        return isSameTable(event)
-                    && event instanceof AlterPartitionEvent
-                    && Objects.equals(partitionBefore, ((AlterPartitionEvent) 
event).partitionBefore)
-                    && Objects.equals(partitionAfter, ((AlterPartitionEvent) 
event).partitionAfter);
+    protected boolean canBeBatched(MetastoreEvent that) {
+        if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) {
+            return false;
+        }
+
+        // Check if `that` event is a rename event, a rename event can not be 
batched
+        // because the process of `that` event will change the reference 
relation of this partition
+        MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) 
that;
+        if (thatPartitionEvent.willChangePartitionName()) {
+            return false;
+        }
+
+        // `that` event can be batched if this event's partitions contains all 
of the partitions which `that` event has
+        // else just remove `that` event's relevant partitions
+        for (String partitionName : getAllPartitionNames()) {
+            if (thatPartitionEvent instanceof AddPartitionEvent) {
+                ((AddPartitionEvent) 
thatPartitionEvent).removePartition(partitionName);
+            } else if (thatPartitionEvent instanceof DropPartitionEvent) {
+                ((DropPartitionEvent) 
thatPartitionEvent).removePartition(partitionName);
+            }
+        }
+
+        return 
getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index bc09d6ef2c..8c79b6d248 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
 
 import java.util.List;
+import java.util.Random;
 
 /**
  * MetastoreEvent for ALTER_TABLE event type
@@ -41,17 +42,17 @@ public class AlterTableEvent extends MetastoreTableEvent {
     // true if this alter event was due to a rename operation
     private final boolean isRename;
     private final boolean isView;
-    private final boolean willCreateOrDropTable;
+    private final String tblNameAfter;
 
     // for test
     public AlterTableEvent(long eventId, String catalogName, String dbName,
                            String tblName, boolean isRename, boolean isView) {
-        super(eventId, catalogName, dbName, tblName);
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.ALTER_TABLE);
         this.isRename = isRename;
         this.isView = isView;
         this.tableBefore = null;
         this.tableAfter = null;
-        this.willCreateOrDropTable = isRename || isView;
+        this.tblNameAfter = isRename ? (tblName + new Random().nextInt(10)) : 
tblName;
     }
 
     private AlterTableEvent(NotificationEvent event, String catalogName) {
@@ -65,6 +66,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
                             .getAlterTableMessage(event.getMessage());
             tableAfter = 
Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
             tableBefore = 
Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+            tblNameAfter = tableAfter.getTableName();
         } catch (Exception e) {
             throw new MetastoreNotificationException(
                     debugString("Unable to parse the alter table message"), e);
@@ -73,7 +75,6 @@ public class AlterTableEvent extends MetastoreTableEvent {
         isRename = 
!tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
                 || 
!tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
         isView = tableBefore.isSetViewExpandedText() || 
tableBefore.isSetViewOriginalText();
-        this.willCreateOrDropTable = isRename || isView;
     }
 
     public static List<MetastoreEvent> getEvents(NotificationEvent event,
@@ -83,7 +84,12 @@ public class AlterTableEvent extends MetastoreTableEvent {
 
     @Override
     protected boolean willCreateOrDropTable() {
-        return willCreateOrDropTable;
+        return isRename || isView;
+    }
+
+    @Override
+    protected boolean willChangeTableName() {
+        return isRename;
     }
 
     private void processRecreateTable() throws DdlException {
@@ -123,6 +129,10 @@ public class AlterTableEvent extends MetastoreTableEvent {
         return isView;
     }
 
+    public String getTblNameAfter() {
+        return tblNameAfter;
+    }
+
     /**
      * If the ALTER_TABLE event is due a table rename, this method removes the 
old table
      * and creates a new table with the new name. Else, we just refresh table
@@ -157,15 +167,22 @@ public class AlterTableEvent extends MetastoreTableEvent {
             return false;
         }
 
-        // `that` event must not be a rename table event
-        // so if the process of this event will drop this table,
-        // it can merge all the table's events before
-        if (willCreateOrDropTable) {
+        // First check if `that` event is a rename event, a rename event can 
not be batched
+        // because the process of `that` event will change the reference 
relation of this table
+        // `that` event must be a MetastoreTableEvent event otherwise 
`isSameTable` will return false
+        MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
+        if (thatTblEvent.willChangeTableName()) {
+            return false;
+        }
+
+        // Then check if the process of this event will create or drop this 
table,
+        // if true then `that` event can be batched
+        if (willCreateOrDropTable()) {
             return true;
         }
 
-        // that event must be a MetastoreTableEvent event
-        // otherwise `isSameTable` will return false
-        return !((MetastoreTableEvent) that).willCreateOrDropTable();
+        // Last, check if the process of `that` event will create or drop this 
table
+        // if false then `that` event can be batched
+        return !thatTblEvent.willCreateOrDropTable();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index eb8da00cfe..42d813319c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -34,7 +34,7 @@ public class CreateDatabaseEvent extends MetastoreEvent {
 
     // for test
     public CreateDatabaseEvent(long eventId, String catalogName, String 
dbName) {
-        super(eventId, catalogName, dbName, null);
+        super(eventId, catalogName, dbName, null, 
MetastoreEventType.CREATE_DATABASE);
     }
 
     private CreateDatabaseEvent(NotificationEvent event,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index 1ec8cbfde5..3dff6420a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -37,7 +37,7 @@ public class CreateTableEvent extends MetastoreTableEvent {
 
     // for test
     public CreateTableEvent(long eventId, String catalogName, String dbName, 
String tblName) {
-        super(eventId, catalogName, dbName, tblName);
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.CREATE_TABLE);
         this.hmsTbl = null;
     }
 
@@ -66,6 +66,11 @@ public class CreateTableEvent extends MetastoreTableEvent {
         return true;
     }
 
+    @Override
+    protected boolean willChangeTableName() {
+        return false;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
index 6f6364657b..3481f832fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
@@ -32,6 +32,11 @@ import java.util.List;
  */
 public class DropDatabaseEvent extends MetastoreEvent {
 
+    // for test
+    public DropDatabaseEvent(long eventId, String catalogName, String dbName) {
+        super(eventId, catalogName, dbName, null, 
MetastoreEventType.DROP_DATABASE);
+    }
+
     private DropDatabaseEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
index 7f8ade0819..738f113f0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -31,15 +32,24 @@ import 
org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * MetastoreEvent for ADD_PARTITION event type
+ * MetastoreEvent for DROP_PARTITION event type
  */
 public class DropPartitionEvent extends MetastorePartitionEvent {
     private final Table hmsTbl;
     private final List<String> partitionNames;
 
+    // for test
+    public DropPartitionEvent(long eventId, String catalogName, String dbName,
+                              String tblName, List<String> partitionNames) {
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.DROP_PARTITION);
+        this.partitionNames = partitionNames;
+        this.hmsTbl = null;
+    }
+
     private DropPartitionEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
@@ -62,6 +72,20 @@ public class DropPartitionEvent extends 
MetastorePartitionEvent {
         }
     }
 
+    @Override
+    protected boolean willChangePartitionName() {
+        return false;
+    }
+
+    @Override
+    public Set<String> getAllPartitionNames() {
+        return ImmutableSet.copyOf(partitionNames);
+    }
+
+    public void removePartition(String partitionName) {
+        partitionNames.remove(partitionName);
+    }
+
     protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
         return Lists.newArrayList(
@@ -85,4 +109,30 @@ public class DropPartitionEvent extends 
MetastorePartitionEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected boolean canBeBatched(MetastoreEvent that) {
+        if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) {
+            return false;
+        }
+
+        MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) 
that;
+        // Check if `that` event is a rename event, a rename event can not be 
batched
+        // because the process of `that` event will change the reference 
relation of this partition
+        if (thatPartitionEvent.willChangePartitionName()) {
+            return false;
+        }
+
+        // `that` event can be batched if this event's partitions contains all 
of the partitions which `that` event has
+        // else just remove `that` event's relevant partitions
+        for (String partitionName : getAllPartitionNames()) {
+            if (thatPartitionEvent instanceof AddPartitionEvent) {
+                ((AddPartitionEvent) 
thatPartitionEvent).removePartition(partitionName);
+            } else if (thatPartitionEvent instanceof DropPartitionEvent) {
+                ((DropPartitionEvent) 
thatPartitionEvent).removePartition(partitionName);
+            }
+        }
+
+        return 
getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index 7b43a09666..c333506cad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -37,7 +37,7 @@ public class DropTableEvent extends MetastoreTableEvent {
     // for test
     public DropTableEvent(long eventId, String catalogName, String dbName,
                            String tblName) {
-        super(eventId, catalogName, dbName, tblName);
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.DROP_TABLE);
         this.tableName = tblName;
     }
 
@@ -67,6 +67,11 @@ public class DropTableEvent extends MetastoreTableEvent {
         return true;
     }
 
+    @Override
+    protected boolean willChangeTableName() {
+        return false;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
@@ -80,8 +85,18 @@ public class DropTableEvent extends MetastoreTableEvent {
 
     @Override
     protected boolean canBeBatched(MetastoreEvent that) {
-        // `that` event must not be a rename table event
-        // so merge all events which belong to this table before is ok
-        return isSameTable(that);
+        if (!isSameTable(that)) {
+            return false;
+        }
+
+        /**
+         * Check if `that` event is a rename event, a rename event can not be 
batched
+         * because the process of `that` event will change the reference 
relation of this table,
+         * otherwise it can be batched because this event is a drop-table event
+         * and the process of this event will drop the whole table,
+         * and `that` event must be a MetastoreTableEvent event otherwise 
`isSameTable` will return false
+         * */
+        MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
+        return !thatTblEvent.willChangeTableName();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
index 3b5650ade4..7436d57c98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
@@ -38,7 +38,7 @@ public class InsertEvent extends MetastoreTableEvent {
     // for test
     public InsertEvent(long eventId, String catalogName, String dbName,
                        String tblName) {
-        super(eventId, catalogName, dbName, tblName);
+        super(eventId, catalogName, dbName, tblName, 
MetastoreEventType.INSERT);
         this.hmsTbl = null;
     }
 
@@ -66,11 +66,16 @@ public class InsertEvent extends MetastoreTableEvent {
         return false;
     }
 
+    @Override
+    protected boolean willChangeTableName() {
+        return false;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
             infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", 
catalogName, dbName, tblName);
-            /***
+            /**
              *  Only when we use hive client to execute a `INSERT INTO TBL 
SELECT * ...` or `INSERT INTO TBL ...` sql
              *  to a non-partitioned table then the hms will generate an 
insert event, and there is not
              *  any partition event occurs, but the file cache may has been 
changed, so we need handle this.
@@ -91,8 +96,11 @@ public class InsertEvent extends MetastoreTableEvent {
             return false;
         }
 
-        // that event must be a MetastoreTableEvent event
-        // otherwise `isSameTable` will return false
+        /**
+         * Because the cache of this table will be cleared when handling 
`InsertEvent`,
+         * so `that` event can be batched if `that` event will not create or 
drop this table,
+         * and `that` event must be a MetastoreTableEvent event otherwise 
`isSameTable` will return false
+         */
         return !((MetastoreTableEvent) that).willCreateOrDropTable();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 08aff93dda..b4fc963d98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -58,12 +58,13 @@ public abstract class MetastoreEvent {
     protected final String catalogName;
 
     // for test
-    protected MetastoreEvent(long eventId, String catalogName, String dbName, 
String tblName) {
+    protected MetastoreEvent(long eventId, String catalogName, String dbName,
+                             String tblName, MetastoreEventType eventType) {
         this.eventId = eventId;
         this.catalogName = catalogName;
         this.dbName = dbName;
         this.tblName = tblName;
-        this.eventType = null;
+        this.eventType = eventType;
         this.metastoreNotificationEvent = null;
         this.event = null;
     }
@@ -97,7 +98,6 @@ public abstract class MetastoreEvent {
     /**
      * Checks if the given event can be batched into this event. Default 
behavior is
      * to return false which can be overridden by a sub-class.
-     * The current version is relatively simple to process batch events, so 
all that need to be processed are true.
      *
      * @param event The event under consideration to be batched into this 
event.
      * @return false if event cannot be batched into this event; otherwise 
true.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index a5bf0d953c..aabc562dba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -99,7 +99,7 @@ public class MetastoreEventFactory implements EventFactory {
         for (int i = 0; i < events.size(); i++) {
             MetastoreEvent event = events.get(i);
 
-            // if the event is a rename event, just clear indexMap
+            // if the event is a rename db event, just clear indexMap
             // to make sure the table references of these events in indexMap 
will not change
             if (event instanceof AlterDatabaseEvent && ((AlterDatabaseEvent) 
event).isRename()) {
                 indexMap.clear();
@@ -135,12 +135,6 @@ public class MetastoreEventFactory implements EventFactory 
{
                         .collect(Collectors.toList());
             indexList.add(i);
             indexMap.put(groupKey, indexList);
-
-            // if the event is a rename event, just clear indexMap
-            // to make sure the table references of these events in indexMap 
will not change
-            if (event instanceof AlterTableEvent && ((AlterTableEvent) 
event).isRename()) {
-                indexMap.clear();
-            }
         }
 
         List<MetastoreEvent> filteredEvents = 
eventsCopy.stream().filter(Objects::nonNull)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
index f8bb457ea3..8a66dd1d6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
@@ -20,14 +20,17 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
+import java.util.Set;
+
 /**
  * Base class for all the partition events
  */
 public abstract class MetastorePartitionEvent extends MetastoreTableEvent {
 
     // for test
-    protected MetastorePartitionEvent(long eventId, String catalogName, String 
dbName, String tblName) {
-        super(eventId, catalogName, dbName, tblName);
+    protected MetastorePartitionEvent(long eventId, String catalogName, String 
dbName,
+                                      String tblName, MetastoreEventType 
eventType) {
+        super(eventId, catalogName, dbName, tblName, eventType);
     }
 
     protected MetastorePartitionEvent(NotificationEvent event, String 
catalogName) {
@@ -37,4 +40,15 @@ public abstract class MetastorePartitionEvent extends 
MetastoreTableEvent {
     protected boolean willCreateOrDropTable() {
         return false;
     }
+
+    protected boolean willChangeTableName() {
+        return false;
+    }
+
+    /**
+     * Returns if the process of this event will rename this partition.
+     */
+    protected abstract boolean willChangePartitionName();
+
+    public abstract Set<String> getAllPartitionNames();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
index c797c1c08d..4e9713a275 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -31,8 +31,9 @@ import java.util.Objects;
 public abstract class MetastoreTableEvent extends MetastoreEvent {
 
     // for test
-    protected MetastoreTableEvent(long eventId, String catalogName, String 
dbName, String tblName) {
-        super(eventId, catalogName, dbName, tblName);
+    protected MetastoreTableEvent(long eventId, String catalogName, String 
dbName,
+                                  String tblName, MetastoreEventType 
eventType) {
+        super(eventId, catalogName, dbName, tblName, eventType);
     }
 
     protected MetastoreTableEvent(NotificationEvent event, String catalogName) 
{
@@ -67,6 +68,11 @@ public abstract class MetastoreTableEvent extends 
MetastoreEvent {
      */
     protected abstract boolean willCreateOrDropTable();
 
+    /**
+     * Returns if the process of this event will rename this table.
+     */
+    protected abstract boolean willChangeTableName();
+
     public TableKey getTableKey() {
         return new TableKey(catalogName, dbName, tblName);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
index ba18c84bd7..c9e566dc9d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
@@ -23,171 +23,450 @@ import 
org.apache.doris.datasource.hive.event.AlterPartitionEvent;
 import org.apache.doris.datasource.hive.event.AlterTableEvent;
 import org.apache.doris.datasource.hive.event.CreateDatabaseEvent;
 import org.apache.doris.datasource.hive.event.CreateTableEvent;
+import org.apache.doris.datasource.hive.event.DropDatabaseEvent;
+import org.apache.doris.datasource.hive.event.DropPartitionEvent;
 import org.apache.doris.datasource.hive.event.DropTableEvent;
 import org.apache.doris.datasource.hive.event.InsertEvent;
 import org.apache.doris.datasource.hive.event.MetastoreEvent;
 import org.apache.doris.datasource.hive.event.MetastoreEventFactory;
 
-import org.apache.hadoop.util.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
 
 
 public class MetastoreEventFactoryTest {
+
     private static final MetastoreEventFactory factory = new 
MetastoreEventFactory();
+    private static final Random random = new 
Random(System.currentTimeMillis());
+    private static final String testCtl = "test_ctl";
+
+    private static final Function<Long, CreateDatabaseEvent> 
createDatabaseEventProducer = eventId
+                -> new CreateDatabaseEvent(eventId, testCtl, randomDb());
+
+    private static final Function<Long, AlterDatabaseEvent> 
alterDatabaseEventProducer = eventId
+                -> new AlterDatabaseEvent(eventId, testCtl, randomDb(), 
randomBool(0.0001D));
+
+    private static final Function<Long, DropDatabaseEvent> 
dropDatabaseEventProducer = eventId
+                -> new DropDatabaseEvent(eventId, testCtl, randomDb());
+
+    private static final Function<Long, CreateTableEvent> 
createTableEventProducer = eventId
+                -> new CreateTableEvent(eventId, testCtl, randomDb(), 
randomTbl());
+
+    private static final Function<Long, AlterTableEvent> 
alterTableEventProducer = eventId
+                -> new AlterTableEvent(eventId, testCtl, randomDb(), 
randomTbl(),
+                randomBool(0.1D), randomBool(0.1D));
+
+    private static final Function<Long, InsertEvent> insertEventProducer = 
eventId
+                -> new InsertEvent(eventId, testCtl, randomDb(), randomTbl());
+
+    private static final Function<Long, DropTableEvent> dropTableEventProducer 
= eventId
+                -> new DropTableEvent(eventId, testCtl, randomDb(), 
randomTbl());
+
+    private static final Function<Long, AddPartitionEvent> 
addPartitionEventProducer = eventId
+                -> new AddPartitionEvent(eventId, testCtl, randomDb(), 
randomTbl(), randomPartitions());
+
+    private static final Function<Long, AlterPartitionEvent> 
alterPartitionEventProducer = eventId
+                -> new AlterPartitionEvent(eventId, testCtl, randomDb(), 
randomTbl(), randomPartition(),
+                randomBool(0.1D));
+
+    private static final Function<Long, DropPartitionEvent> 
dropPartitionEventProducer = eventId
+                -> new DropPartitionEvent(eventId, testCtl, randomDb(), 
randomTbl(), randomPartitions());
+
+    private static final List<Function<Long, ? extends MetastoreEvent>> 
eventProducers = Arrays.asList(
+                createDatabaseEventProducer, alterDatabaseEventProducer, 
dropDatabaseEventProducer,
+                createTableEventProducer, alterTableEventProducer, 
insertEventProducer, dropTableEventProducer,
+                addPartitionEventProducer, alterPartitionEventProducer, 
dropPartitionEventProducer);
+
+    private static String randomDb() {
+        return "db_" + random.nextInt(10);
+    }
+
+    private static String randomTbl() {
+        return "tbl_" + random.nextInt(100);
+    }
+
+    private static String randomPartition() {
+        return "partition_" + random.nextInt(1000);
+    }
+
+    private static List<String> randomPartitions() {
+        int times = random.nextInt(100) + 1;
+        Set<String> partitions = Sets.newHashSet();
+        for (int i = 0; i < times; i++) {
+            partitions.add(randomPartition());
+        }
+        return Lists.newArrayList(partitions);
+    }
+
+    private static boolean randomBool(double possibility) {
+        Preconditions.checkArgument(possibility >= 0.0D && possibility <= 
1.0D);
+        int upperBound = (int) Math.floor(1000000 * possibility);
+        return random.nextInt(1000000) <= upperBound;
+    }
+
+    // define MockCatalog/MockDatabase/MockTable/MockPartition to simulate the 
real catalog/database/table/partition
+    private static class MockCatalog {
+        private String ctlName;
+        private Map<String, MockDatabase> databases = Maps.newHashMap();
+
+        private MockCatalog(String ctlName) {
+            this.ctlName = ctlName;
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * Objects.hash(ctlName) + Arrays.hashCode(
+                        
databases.values().stream().sorted(Comparator.comparing(d -> 
d.dbName)).toArray());
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof MockCatalog)) {
+                return false;
+            }
+            if (!Objects.equals(this.ctlName, ((MockCatalog) other).ctlName)) {
+                return false;
+            }
+            Object[] sortedDatabases = databases.values().stream()
+                        .sorted(Comparator.comparing(d -> d.dbName)).toArray();
+            Object[] otherSortedDatabases = ((MockCatalog) 
other).databases.values().stream()
+                        .sorted(Comparator.comparing(d -> d.dbName)).toArray();
+            return Arrays.equals(sortedDatabases, otherSortedDatabases);
+        }
+
+        public MockCatalog copy() {
+            MockCatalog mockCatalog = new MockCatalog(this.ctlName);
+            mockCatalog.databases.putAll(this.databases);
+            return mockCatalog;
+        }
+    }
+
+    private static class MockDatabase {
+        private String dbName;
+        private Map<String, MockTable> tables = Maps.newHashMap();
+
+        private MockDatabase(String dbName) {
+            this.dbName = dbName;
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * Objects.hash(dbName) + Arrays.hashCode(
+                        tables.values().stream().sorted(Comparator.comparing(t 
-> t.tblName)).toArray());
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof MockDatabase)) {
+                return false;
+            }
+            if (!Objects.equals(this.dbName, ((MockDatabase) other).dbName)) {
+                return false;
+            }
+            Object[] sortedTables = tables.values().stream()
+                        .sorted(Comparator.comparing(t -> 
t.tblName)).toArray();
+            Object[] otherSortedTables = ((MockDatabase) 
other).tables.values().stream()
+                        .sorted(Comparator.comparing(t -> 
t.tblName)).toArray();
+            return Arrays.equals(sortedTables, otherSortedTables);
+        }
+
+        public MockDatabase copy() {
+            MockDatabase mockDatabase = new MockDatabase(this.dbName);
+            mockDatabase.tables.putAll(this.tables);
+            return mockDatabase;
+        }
+    }
+
+    private static class MockTable {
+        private String tblName;
+        // use this filed to mark if the table has been refreshed
+        private boolean refreshed;
+        private Map<String, MockPartition> partitions = Maps.newHashMap();
+
+        private MockTable(String tblName) {
+            this.tblName = tblName;
+        }
+
+        public void refresh() {
+            this.refreshed = true;
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * Objects.hash(tblName, refreshed) + Arrays.hashCode(
+                        
partitions.values().stream().sorted(Comparator.comparing(p -> 
p.partitionName)).toArray());
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof MockTable)) {
+                return false;
+            }
+            if (!Objects.equals(this.tblName, ((MockTable) other).tblName)) {
+                return false;
+            }
+            if (refreshed != ((MockTable) other).refreshed) {
+                return false;
+            }
+            Object[] sortedPartitions = partitions.values().stream()
+                        .sorted(Comparator.comparing(p -> 
p.partitionName)).toArray();
+            Object[] otherSortedPartitions = ((MockTable) 
other).partitions.values().stream()
+                        .sorted(Comparator.comparing(p -> 
p.partitionName)).toArray();
+            return Arrays.equals(sortedPartitions, otherSortedPartitions);
+        }
+
+        public MockTable copy() {
+            MockTable copyTbl = new MockTable(this.tblName);
+            copyTbl.partitions.putAll(this.partitions);
+            return copyTbl;
+        }
+    }
+
+    private static class MockPartition {
+        private String partitionName;
+        // use this filed to mark if the partition has been refreshed
+        private boolean refreshed;
+
+        private MockPartition(String partitionName) {
+            this.partitionName = partitionName;
+            this.refreshed = false;
+        }
+
+        public void refresh() {
+            this.refreshed = true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(refreshed, partitionName);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return other instanceof MockPartition
+                        && refreshed == ((MockPartition) other).refreshed
+                        && Objects.equals(this.partitionName, ((MockPartition) 
other).partitionName);
+        }
+    }
+
+    // simulate the processes when handling hms events
+    private void processEvent(MockCatalog ctl, MetastoreEvent event) {
+        switch (event.getEventType()) {
+
+            case CREATE_DATABASE:
+                MockDatabase database = new MockDatabase(event.getDbName());
+                ctl.databases.put(database.dbName, database);
+                break;
+
+            case DROP_DATABASE:
+                ctl.databases.remove(event.getDbName());
+                break;
+
+            case ALTER_DATABASE:
+                String dbName = event.getDbName();
+                if (((AlterDatabaseEvent) event).isRename()) {
+                    ctl.databases.remove(dbName);
+                    MockDatabase newDatabase = new 
MockDatabase(((AlterDatabaseEvent) event).getDbNameAfter());
+                    ctl.databases.put(newDatabase.dbName, newDatabase);
+                } else {
+                    if (ctl.databases.containsKey(event.getDbName())) {
+                        ctl.databases.get(event.getDbName()).tables.clear();
+                    }
+                }
+                break;
+
+            case CREATE_TABLE:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    MockTable tbl = new MockTable(event.getTblName());
+                    
ctl.databases.get(event.getDbName()).tables.put(event.getTblName(), tbl);
+                }
+                break;
+
+            case DROP_TABLE:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    
ctl.databases.get(event.getDbName()).tables.remove(event.getTblName());
+                }
+                break;
+
+            case ALTER_TABLE:
+            case INSERT:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    if (event instanceof AlterTableEvent
+                                && (((AlterTableEvent) event).isRename() || 
((AlterTableEvent) event).isView())) {
+                        
ctl.databases.get(event.getDbName()).tables.remove(event.getTblName());
+                        MockTable tbl = new MockTable(((AlterTableEvent) 
event).getTblNameAfter());
+                        
ctl.databases.get(event.getDbName()).tables.put(tbl.tblName, tbl);
+                    } else {
+                        MockTable tbl = 
ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
+                        if (tbl != null) {
+                            tbl.partitions.clear();
+                            tbl.refresh();
+                        }
+                    }
+                }
+                break;
+
+            case ADD_PARTITION:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    MockTable tbl = 
ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
+                    if (tbl != null) {
+                        for (String partitionName : ((AddPartitionEvent) 
event).getAllPartitionNames()) {
+                            MockPartition partition = new 
MockPartition(partitionName);
+                            tbl.partitions.put(partitionName, partition);
+                        }
+                    }
+                }
+                break;
+
+            case ALTER_PARTITION:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    MockTable tbl = 
ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
+                    AlterPartitionEvent alterPartitionEvent = 
((AlterPartitionEvent) event);
+                    if (tbl != null) {
+                        if (alterPartitionEvent.isRename()) {
+                            for (String partitionName : 
alterPartitionEvent.getAllPartitionNames()) {
+                                tbl.partitions.remove(partitionName);
+                            }
+                            MockPartition partition = new 
MockPartition(alterPartitionEvent.getPartitionNameAfter());
+                            tbl.partitions.put(partition.partitionName, 
partition);
+                        } else {
+                            for (String partitionName : 
alterPartitionEvent.getAllPartitionNames()) {
+                                MockPartition partition = 
tbl.partitions.get(partitionName);
+                                if (partition != null) {
+                                    partition.refresh();
+                                }
+                            }
+                        }
+                    }
+                }
+                break;
+
+            case DROP_PARTITION:
+                if (ctl.databases.containsKey(event.getDbName())) {
+                    MockTable tbl = 
ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
+                    if (tbl != null) {
+                        for (String partitionName : ((DropPartitionEvent) 
event).getAllPartitionNames()) {
+                            tbl.partitions.remove(partitionName);
+                        }
+                    }
+                }
+                break;
+
+            default:
+                Assertions.fail("Unknown event type : " + 
event.getEventType());
+        }
+    }
+
+    static class EventProducer {
+        // every type of event has a proportion
+        // for instance, if the `CreateDatabaseEvent`'s proportion is 1
+        // and the `AlterDatabaseEvent`'s proportion is 10
+        // the event count of `AlterDatabaseEvent` is always about 10 times as 
the `CreateDatabaseEvent`
+        private final List<Integer> proportions;
+        private final int sumOfProportions;
+
+        EventProducer(List<Integer> proportions) {
+            Preconditions.checkArgument(CollectionUtils.isNotEmpty(proportions)
+                        && proportions.size() == eventProducers.size());
+            this.proportions = ImmutableList.copyOf(proportions);
+            this.sumOfProportions = proportions.stream().mapToInt(proportion 
-> proportion).sum();
+        }
+
+        public MetastoreEvent produceOneEvent(long eventId) {
+            return 
eventProducers.get(calIndex(random.nextInt(sumOfProportions))).apply(eventId);
+        }
+
+        private int calIndex(int val) {
+            int currentIndex = 0;
+            int currentBound = proportions.get(currentIndex);
+            while (currentIndex < proportions.size() - 1) {
+                if (val > currentBound) {
+                    currentBound += proportions.get(++currentIndex);
+                } else {
+                    return currentIndex;
+                }
+            }
+            return proportions.size() - 1;
+        }
+    }
 
     @Test
     public void testCreateBatchEvents() {
-        AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl", 
"test_db", "t1", "p1", "p1");
-        AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl", 
"test_db", "t1", "p1", "p1");
-        AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl", 
"test_db", "t1", Arrays.asList("p1"));
-        AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db", 
"t1", false, false);
-        AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db", 
"t1", true, false);
-        AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db", 
"t1", false, true);
-        DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db", 
"t1");
-        InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1");
-        CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl", 
"test_db2");
-        AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl", 
"test_db", "t2", "p1", "p1");
-        AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db", 
"t1", false, false);
-        CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl", 
"test_db", "t1");
-        AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl", 
"test_db", true);
-        AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl", 
"test_db", false);
-
-        List<MetastoreEvent> mergedEvents;
-        List<MetastoreEvent> testEvents = Lists.newLinkedList();
-
-        testEvents.add(e1);
-        testEvents.add(e2);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 1);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e3);
-        testEvents.add(e9);
-        testEvents.add(e10);
-        testEvents.add(e4);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
-
-        // because e5 is a rename event, it will not be merged
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e5);
-        testEvents.add(e4);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e6);
-        testEvents.add(e4);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e4);
-        testEvents.add(e11);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 2);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e4);
-        testEvents.add(e8);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 2);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L);
-
-        // because e5 is a rename event, it will not be merged
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e5);
-        testEvents.add(e8);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e12);
-        testEvents.add(e4);
-        testEvents.add(e7);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 2);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L);
-
-        // because e5 is a rename event, it will not be merged
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e5);
-        testEvents.add(e7);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e4);
-        testEvents.add(e13);
-        testEvents.add(e7);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 4);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L);
-        Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L);
-
-        testEvents.clear();
-        testEvents.add(e1);
-        testEvents.add(e2);
-        testEvents.add(e10);
-        testEvents.add(e4);
-        testEvents.add(e14);
-        testEvents.add(e7);
-        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
-        Assertions.assertTrue(mergedEvents.size() == 3);
-        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
-        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L);
-        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
+        // for catalog initialization, so just produce CreateXXXEvent / 
AddXXXEvent
+        List<Integer> initProportions = Lists.newArrayList(
+                1, // CreateDatabaseEvent
+                0, // AlterDatabaseEvent
+                0, // DropDatabaseEvent
+                10, // CreateTableEvent
+                0, // AlterTableEvent
+                0, // InsertEvent
+                0, // DropTableEvent
+                100, // AddPartitionEvent
+                0, // AlterPartitionEvent
+                0 // DropPartitionEvent
+        );
+
+        List<Integer> proportions = Lists.newArrayList(
+                5, // CreateDatabaseEvent
+                1, // AlterDatabaseEvent
+                5, // DropDatabaseEvent
+                100, // CreateTableEvent
+                20000, // AlterTableEvent
+                2000, // InsertEvent
+                5000, // DropTableEvent
+                10000, // AddPartitionEvent
+                50000, // AlterPartitionEvent
+                20000 // DropPartitionEvent
+        );
+        EventProducer initProducer = new EventProducer(initProportions);
+        EventProducer producer = new EventProducer(proportions);
+
+        for (int i = 0; i < 200; i++) {
+            // create a test catalog and do initialization
+            MockCatalog testCatalog = new MockCatalog(testCtl);
+            List<MetastoreEvent> initEvents = 
Lists.newArrayListWithCapacity(1000);
+            for (int j = 0; j < 1000; j++) {
+                initEvents.add(initProducer.produceOneEvent(j));
+            }
+            for (MetastoreEvent event : initEvents) {
+                processEvent(testCatalog, event);
+            }
+
+            // copy the test catalog to the validate catalog
+            MockCatalog validateCatalog = testCatalog.copy();
+
+            List<MetastoreEvent> events = Lists.newArrayListWithCapacity(1000);
+            for (int j = 0; j < 1000; j++) {
+                events.add(producer.produceOneEvent(j));
+            }
+            List<MetastoreEvent> mergedEvents = 
factory.createBatchEvents(testCtl, events);
+
+            for (MetastoreEvent event : events) {
+                processEvent(validateCatalog, event);
+            }
+
+            for (MetastoreEvent event : mergedEvents) {
+                processEvent(testCatalog, event);
+            }
+
+            // the test catalog should be equals to the validate catalog
+            // otherwise we must have some bugs at 
`factory.createBatchEvents()`
+            Assertions.assertEquals(testCatalog, validateCatalog);
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to