This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 105a162f94 [Enhancement](multi-catalog) Merge hms events every round 
to speed up events processing. (#21589)
105a162f94 is described below

commit 105a162f94824c28e770a1013c259412b2dec25b
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Wed Jul 12 23:41:07 2023 +0800

    [Enhancement](multi-catalog) Merge hms events every round to speed up 
events processing. (#21589)
    
    Currently we find that MetastoreEventsProcessor can not catch up the event 
producing rate in our cluster, so we need to merge some hms events every round.
---
 .../datasource/hive/event/AddPartitionEvent.java   |  10 +-
 .../datasource/hive/event/AlterDatabaseEvent.java  |  72 +++++++-
 .../datasource/hive/event/AlterPartitionEvent.java |  23 ++-
 .../datasource/hive/event/AlterTableEvent.java     |  46 ++++-
 .../datasource/hive/event/CreateDatabaseEvent.java |   5 +
 .../datasource/hive/event/CreateTableEvent.java    |  11 ++
 .../datasource/hive/event/DropPartitionEvent.java  |   2 +-
 .../datasource/hive/event/DropTableEvent.java      |  19 ++
 .../doris/datasource/hive/event/InsertEvent.java   |  23 +++
 .../datasource/hive/event/MetastoreEvent.java      |  11 ++
 .../hive/event/MetastoreEventFactory.java          |  86 +++++++--
 ...baseEvent.java => MetastorePartitionEvent.java} |  26 +--
 .../datasource/hive/event/MetastoreTableEvent.java |  50 ++++++
 .../external/hms/MetastoreEventFactoryTest.java    | 193 +++++++++++++++++++++
 14 files changed, 543 insertions(+), 34 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
index 11d74ed9c2..b94333e41b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -37,10 +37,18 @@ import java.util.stream.Collectors;
 /**
  * MetastoreEvent for ADD_PARTITION event type
  */
-public class AddPartitionEvent extends MetastoreTableEvent {
+public class AddPartitionEvent extends MetastorePartitionEvent {
     private final Table hmsTbl;
     private final List<String> partitionNames;
 
+    // for test
+    public AddPartitionEvent(long eventId, String catalogName, String dbName,
+                             String tblName, List<String> partitionNames) {
+        super(eventId, catalogName, dbName, tblName);
+        this.partitionNames = partitionNames;
+        this.hmsTbl = null;
+    }
+
     private AddPartitionEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
index 59445a5dc4..d56eb52fad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
@@ -18,21 +18,74 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import 
org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
 
 import java.util.List;
 
 /**
- * MetastoreEvent for Alter_DATABASE event type
+ * MetastoreEvent for ALTER_DATABASE event type
  */
 public class AlterDatabaseEvent extends MetastoreEvent {
 
+    private final Database dbBefore;
+    private final Database dbAfter;
+
+    // true if this alter event was due to a rename operation
+    private final boolean isRename;
+
+    // for test
+    public AlterDatabaseEvent(long eventId, String catalogName, String dbName, 
boolean isRename) {
+        super(eventId, catalogName, dbName, null);
+        this.isRename = isRename;
+        this.dbBefore = null;
+        this.dbAfter = null;
+    }
+
     private AlterDatabaseEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
         
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
+
+        try {
+            JSONAlterDatabaseMessage alterDatabaseMessage =
+                    (JSONAlterDatabaseMessage) 
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
+                                .getAlterDatabaseMessage(event.getMessage());
+            dbBefore = 
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
+            dbAfter = 
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(
+                    debugString("Unable to parse the alter database message"), 
e);
+        }
+        // this is a rename event if either dbName of before and after object 
changed
+        isRename = !dbBefore.getName().equalsIgnoreCase(dbAfter.getName());
+    }
+
+    private void processRename() throws DdlException {
+        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support ExternalCatalog Databases");
+        }
+        if (catalog.getDbNullable(dbAfter.getName()) != null) {
+            infoLog("AlterExternalDatabase canceled, because dbAfter has 
exist, "
+                            + "catalogName:[{}],dbName:[{}]",
+                    catalogName, dbAfter.getName());
+            return;
+        }
+        
Env.getCurrentEnv().getCatalogMgr().dropExternalDatabase(dbBefore.getName(), 
catalogName, true);
+        
Env.getCurrentEnv().getCatalogMgr().createExternalDatabase(dbAfter.getName(), 
catalogName, true);
+
     }
 
     protected static List<MetastoreEvent> getEvents(NotificationEvent event,
@@ -40,9 +93,22 @@ public class AlterDatabaseEvent extends MetastoreEvent {
         return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
     }
 
+    public boolean isRename() {
+        return isRename;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
-        // only can change properties,we do nothing
-        infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+        try {
+            if (isRename) {
+                processRename();
+                return;
+            }
+            // only can change properties,we do nothing
+            infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"), e);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
index 788b79f885..0fc6be375d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -30,12 +30,13 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
  * MetastoreEvent for ALTER_PARTITION event type
  */
-public class AlterPartitionEvent extends MetastoreTableEvent {
+public class AlterPartitionEvent extends MetastorePartitionEvent {
     private final Table hmsTbl;
     private final org.apache.hadoop.hive.metastore.api.Partition 
partitionAfter;
     private final org.apache.hadoop.hive.metastore.api.Partition 
partitionBefore;
@@ -44,6 +45,18 @@ public class AlterPartitionEvent extends MetastoreTableEvent 
{
     // true if this alter event was due to a rename operation
     private final boolean isRename;
 
+    // for test
+    public AlterPartitionEvent(long eventId, String catalogName, String 
dbName, String tblName,
+                                String partitionNameBefore, String 
partitionNameAfter) {
+        super(eventId, catalogName, dbName, tblName);
+        this.partitionNameBefore = partitionNameBefore;
+        this.partitionNameAfter = partitionNameAfter;
+        this.hmsTbl = null;
+        this.partitionAfter = null;
+        this.partitionBefore = null;
+        isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
+    }
+
     private AlterPartitionEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
@@ -94,4 +107,12 @@ public class AlterPartitionEvent extends 
MetastoreTableEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected boolean canBeBatched(MetastoreEvent event) {
+        return isSameTable(event)
+                    && event instanceof AlterPartitionEvent
+                    && Objects.equals(partitionBefore, ((AlterPartitionEvent) 
event).partitionBefore)
+                    && Objects.equals(partitionAfter, ((AlterPartitionEvent) 
event).partitionAfter);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index cbb1ee8478..bc09d6ef2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -41,6 +41,18 @@ public class AlterTableEvent extends MetastoreTableEvent {
     // true if this alter event was due to a rename operation
     private final boolean isRename;
     private final boolean isView;
+    private final boolean willCreateOrDropTable;
+
+    // for test
+    public AlterTableEvent(long eventId, String catalogName, String dbName,
+                           String tblName, boolean isRename, boolean isView) {
+        super(eventId, catalogName, dbName, tblName);
+        this.isRename = isRename;
+        this.isView = isView;
+        this.tableBefore = null;
+        this.tableAfter = null;
+        this.willCreateOrDropTable = isRename || isView;
+    }
 
     private AlterTableEvent(NotificationEvent event, String catalogName) {
         super(event, catalogName);
@@ -61,13 +73,19 @@ public class AlterTableEvent extends MetastoreTableEvent {
         isRename = 
!tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
                 || 
!tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
         isView = tableBefore.isSetViewExpandedText() || 
tableBefore.isSetViewOriginalText();
+        this.willCreateOrDropTable = isRename || isView;
     }
 
     public static List<MetastoreEvent> getEvents(NotificationEvent event,
-            String catalogName) {
+                                                 String catalogName) {
         return Lists.newArrayList(new AlterTableEvent(event, catalogName));
     }
 
+    @Override
+    protected boolean willCreateOrDropTable() {
+        return willCreateOrDropTable;
+    }
+
     private void processRecreateTable() throws DdlException {
         if (!isView) {
             return;
@@ -97,6 +115,14 @@ public class AlterTableEvent extends MetastoreTableEvent {
 
     }
 
+    public boolean isRename() {
+        return isRename;
+    }
+
+    public boolean isView() {
+        return isView;
+    }
+
     /**
      * If the ALTER_TABLE event is due a table rename, this method removes the 
old table
      * and creates a new table with the new name. Else, we just refresh table
@@ -124,4 +150,22 @@ public class AlterTableEvent extends MetastoreTableEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected boolean canBeBatched(MetastoreEvent that) {
+        if (!isSameTable(that)) {
+            return false;
+        }
+
+        // `that` event must not be a rename table event
+        // so if the process of this event will drop this table,
+        // it can merge all the table's events before
+        if (willCreateOrDropTable) {
+            return true;
+        }
+
+        // that event must be a MetastoreTableEvent event
+        // otherwise `isSameTable` will return false
+        return !((MetastoreTableEvent) that).willCreateOrDropTable();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index e115f80f51..eb8da00cfe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -32,6 +32,11 @@ import java.util.List;
  */
 public class CreateDatabaseEvent extends MetastoreEvent {
 
+    // for test
+    public CreateDatabaseEvent(long eventId, String catalogName, String 
dbName) {
+        super(eventId, catalogName, dbName, null);
+    }
+
     private CreateDatabaseEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index 9ac8fd4e68..1ec8cbfde5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -35,6 +35,12 @@ import java.util.List;
 public class CreateTableEvent extends MetastoreTableEvent {
     private final Table hmsTbl;
 
+    // for test
+    public CreateTableEvent(long eventId, String catalogName, String dbName, 
String tblName) {
+        super(eventId, catalogName, dbName, tblName);
+        this.hmsTbl = null;
+    }
+
     private CreateTableEvent(NotificationEvent event, String catalogName) 
throws MetastoreNotificationException {
         super(event, catalogName);
         
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
@@ -55,6 +61,11 @@ public class CreateTableEvent extends MetastoreTableEvent {
         return Lists.newArrayList(new CreateTableEvent(event, catalogName));
     }
 
+    @Override
+    protected boolean willCreateOrDropTable() {
+        return true;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
index a53cf218db..7f8ade0819 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
 /**
  * MetastoreEvent for ADD_PARTITION event type
  */
-public class DropPartitionEvent extends MetastoreTableEvent {
+public class DropPartitionEvent extends MetastorePartitionEvent {
     private final Table hmsTbl;
     private final List<String> partitionNames;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index c73a59f1c3..7b43a09666 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -34,6 +34,13 @@ import java.util.List;
 public class DropTableEvent extends MetastoreTableEvent {
     private final String tableName;
 
+    // for test
+    public DropTableEvent(long eventId, String catalogName, String dbName,
+                           String tblName) {
+        super(eventId, catalogName, dbName, tblName);
+        this.tableName = tblName;
+    }
+
     private DropTableEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
@@ -55,6 +62,11 @@ public class DropTableEvent extends MetastoreTableEvent {
         return Lists.newArrayList(new DropTableEvent(event, catalogName));
     }
 
+    @Override
+    protected boolean willCreateOrDropTable() {
+        return true;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
@@ -65,4 +77,11 @@ public class DropTableEvent extends MetastoreTableEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected boolean canBeBatched(MetastoreEvent that) {
+        // `that` event must not be a rename table event
+        // so merge all events which belong to this table before is ok
+        return isSameTable(that);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
index 27438a4dcb..3b5650ade4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
@@ -35,6 +35,13 @@ import java.util.List;
 public class InsertEvent extends MetastoreTableEvent {
     private final Table hmsTbl;
 
+    // for test
+    public InsertEvent(long eventId, String catalogName, String dbName,
+                       String tblName) {
+        super(eventId, catalogName, dbName, tblName);
+        this.hmsTbl = null;
+    }
+
     private InsertEvent(NotificationEvent event, String catalogName) {
         super(event, catalogName);
         
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
@@ -54,6 +61,11 @@ public class InsertEvent extends MetastoreTableEvent {
         return Lists.newArrayList(new InsertEvent(event, catalogName));
     }
 
+    @Override
+    protected boolean willCreateOrDropTable() {
+        return false;
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
@@ -72,4 +84,15 @@ public class InsertEvent extends MetastoreTableEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected boolean canBeBatched(MetastoreEvent that) {
+        if (!isSameTable(that)) {
+            return false;
+        }
+
+        // that event must be a MetastoreTableEvent event
+        // otherwise `isSameTable` will return false
+        return !((MetastoreTableEvent) that).willCreateOrDropTable();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 9693bb0c4c..08aff93dda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -57,6 +57,17 @@ public abstract class MetastoreEvent {
 
     protected final String catalogName;
 
+    // for test
+    protected MetastoreEvent(long eventId, String catalogName, String dbName, 
String tblName) {
+        this.eventId = eventId;
+        this.catalogName = catalogName;
+        this.dbName = dbName;
+        this.tblName = tblName;
+        this.eventType = null;
+        this.metastoreNotificationEvent = null;
+        this.event = null;
+    }
+
     protected MetastoreEvent(NotificationEvent event, String catalogName) {
         this.event = event;
         this.dbName = event.getDbName();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index 3ab2a7e030..a5bf0d953c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -21,12 +21,17 @@ package org.apache.doris.datasource.hive.event;
 import org.apache.doris.datasource.HMSExternalCatalog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Factory class to create various MetastoreEvents.
@@ -36,7 +41,7 @@ public class MetastoreEventFactory implements EventFactory {
 
     @Override
     public List<MetastoreEvent> 
transferNotificationEventToMetastoreEvents(NotificationEvent event,
-            String catalogName) {
+                                                                           
String catalogName) {
         Preconditions.checkNotNull(event.getEventType());
         MetastoreEventType metastoreEventType = 
MetastoreEventType.from(event.getEventType());
         switch (metastoreEventType) {
@@ -68,19 +73,80 @@ public class MetastoreEventFactory implements EventFactory {
 
     List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, 
HMSExternalCatalog hmsExternalCatalog) {
         List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
+        String catalogName = hmsExternalCatalog.getName();
         for (NotificationEvent event : events) {
-            
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, 
hmsExternalCatalog.getName()));
+            
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, 
catalogName));
         }
-        return createBatchEvents(metastoreEvents);
+        return createBatchEvents(catalogName, metastoreEvents);
     }
 
     /**
-     * Create batch event tasks according to HivePartitionName to facilitate 
subsequent parallel processing.
-     * For ADD_PARTITION and DROP_PARTITION, we directly override any events 
before that partition.
-     * For a partition, it is meaningless to process any events before the 
drop partition.
-     */
-    List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
-        // now do nothing
-        return events;
+     * Merge events to reduce the cost time on event processing, currently 
mainly handles MetastoreTableEvent
+     * because merge MetastoreTableEvent is simple and cost-effective.
+     * For example, consider there are some events as following:
+     *
+     *    event1: alter table db1.t1 add partition p1;
+     *    event2: alter table db1.t1 drop partition p2;
+     *    event3: alter table db1.t2 add partition p3;
+     *    event4: alter table db2.t3 rename to t4;
+     *    event5: drop table db1.t1;
+     *
+     * Only `event3 event4 event5` will be reserved and other events will be 
skipped.
+     * */
+    public List<MetastoreEvent> createBatchEvents(String catalogName, 
List<MetastoreEvent> events) {
+        List<MetastoreEvent> eventsCopy = Lists.newArrayList(events);
+        Map<MetastoreTableEvent.TableKey, List<Integer>> indexMap = 
Maps.newLinkedHashMap();
+        for (int i = 0; i < events.size(); i++) {
+            MetastoreEvent event = events.get(i);
+
+            // if the event is a rename event, just clear indexMap
+            // to make sure the table references of these events in indexMap 
will not change
+            if (event instanceof AlterDatabaseEvent && ((AlterDatabaseEvent) 
event).isRename()) {
+                indexMap.clear();
+                continue;
+            }
+
+            // Only check MetastoreTableEvent
+            if (!(event instanceof MetastoreTableEvent)) {
+                continue;
+            }
+
+            // Divide events into multi groups to reduce check count
+            MetastoreTableEvent.TableKey groupKey = ((MetastoreTableEvent) 
event).getTableKey();
+            if (!indexMap.containsKey(groupKey)) {
+                List<Integer> indexList = Lists.newLinkedList();
+                indexList.add(i);
+                indexMap.put(groupKey, indexList);
+                continue;
+            }
+
+            List<Integer> indexList = indexMap.get(groupKey);
+            for (int j = 0; j < indexList.size(); j++) {
+                int candidateIndex = indexList.get(j);
+                if (candidateIndex == -1) {
+                    continue;
+                }
+                if (event.canBeBatched(events.get(candidateIndex))) {
+                    eventsCopy.set(candidateIndex, null);
+                    indexList.set(j, -1);
+                }
+            }
+            indexList = indexList.stream().filter(index -> index != -1)
+                        .collect(Collectors.toList());
+            indexList.add(i);
+            indexMap.put(groupKey, indexList);
+
+            // if the event is a rename event, just clear indexMap
+            // to make sure the table references of these events in indexMap 
will not change
+            if (event instanceof AlterTableEvent && ((AlterTableEvent) 
event).isRename()) {
+                indexMap.clear();
+            }
+        }
+
+        List<MetastoreEvent> filteredEvents = 
eventsCopy.stream().filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        LOG.info("Event size on catalog [{}] before merge is [{}], after merge 
is [{}]",
+                    catalogName, events.size(), filteredEvents.size());
+        return ImmutableList.copyOf(filteredEvents);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
similarity index 55%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
index 59445a5dc4..f8bb457ea3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
@@ -18,31 +18,23 @@
 
 package org.apache.doris.datasource.hive.event;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
-import java.util.List;
-
 /**
- * MetastoreEvent for Alter_DATABASE event type
+ * Base class for all the partition events
  */
-public class AlterDatabaseEvent extends MetastoreEvent {
+public abstract class MetastorePartitionEvent extends MetastoreTableEvent {
 
-    private AlterDatabaseEvent(NotificationEvent event,
-            String catalogName) {
-        super(event, catalogName);
-        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
+    // for test
+    protected MetastorePartitionEvent(long eventId, String catalogName, String 
dbName, String tblName) {
+        super(eventId, catalogName, dbName, tblName);
     }
 
-    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
-            String catalogName) {
-        return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
+    protected MetastorePartitionEvent(NotificationEvent event, String 
catalogName) {
+        super(event, catalogName);
     }
 
-    @Override
-    protected void process() throws MetastoreNotificationException {
-        // only can change properties,we do nothing
-        infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+    protected boolean willCreateOrDropTable() {
+        return false;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
index 70f56bdbb0..c797c1c08d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -23,12 +23,17 @@ import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Base class for all the table events
  */
 public abstract class MetastoreTableEvent extends MetastoreEvent {
 
+    // for test
+    protected MetastoreTableEvent(long eventId, String catalogName, String 
dbName, String tblName) {
+        super(eventId, catalogName, dbName, tblName);
+    }
 
     protected MetastoreTableEvent(NotificationEvent event, String catalogName) 
{
         super(event, catalogName);
@@ -47,4 +52,49 @@ public abstract class MetastoreTableEvent extends 
MetastoreEvent {
                     .add("numFiles")
                     .add("comment")
                     .build();
+
+    protected boolean isSameTable(MetastoreEvent that) {
+        if (!(that instanceof MetastoreTableEvent)) {
+            return false;
+        }
+        TableKey thisKey = getTableKey();
+        TableKey thatKey = ((MetastoreTableEvent) that).getTableKey();
+        return Objects.equals(thisKey, thatKey);
+    }
+
+    /**
+     * Returns if the process of this event will create or drop this table.
+     */
+    protected abstract boolean willCreateOrDropTable();
+
+    public TableKey getTableKey() {
+        return new TableKey(catalogName, dbName, tblName);
+    }
+
+    static class TableKey {
+        private final String catalogName;
+        private final String dbName;
+        private final String tblName;
+
+        private TableKey(String catalogName, String dbName, String tblName) {
+            this.catalogName = catalogName;
+            this.dbName = dbName;
+            this.tblName = tblName;
+        }
+
+        @Override
+        public boolean equals(Object that) {
+            if (!(that instanceof TableKey)) {
+                return false;
+            }
+            return Objects.equals(catalogName, ((TableKey) that).catalogName)
+                        && Objects.equals(dbName, ((TableKey) that).dbName)
+                        && Objects.equals(tblName, ((TableKey) that).tblName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(catalogName, dbName, tblName);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
new file mode 100644
index 0000000000..ba18c84bd7
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.hms;
+
+import org.apache.doris.datasource.hive.event.AddPartitionEvent;
+import org.apache.doris.datasource.hive.event.AlterDatabaseEvent;
+import org.apache.doris.datasource.hive.event.AlterPartitionEvent;
+import org.apache.doris.datasource.hive.event.AlterTableEvent;
+import org.apache.doris.datasource.hive.event.CreateDatabaseEvent;
+import org.apache.doris.datasource.hive.event.CreateTableEvent;
+import org.apache.doris.datasource.hive.event.DropTableEvent;
+import org.apache.doris.datasource.hive.event.InsertEvent;
+import org.apache.doris.datasource.hive.event.MetastoreEvent;
+import org.apache.doris.datasource.hive.event.MetastoreEventFactory;
+
+import org.apache.hadoop.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+public class MetastoreEventFactoryTest {
+    private static final MetastoreEventFactory factory = new 
MetastoreEventFactory();
+
+    @Test
+    public void testCreateBatchEvents() {
+        AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl", 
"test_db", "t1", "p1", "p1");
+        AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl", 
"test_db", "t1", "p1", "p1");
+        AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl", 
"test_db", "t1", Arrays.asList("p1"));
+        AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db", 
"t1", false, false);
+        AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db", 
"t1", true, false);
+        AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db", 
"t1", false, true);
+        DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db", 
"t1");
+        InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1");
+        CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl", 
"test_db2");
+        AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl", 
"test_db", "t2", "p1", "p1");
+        AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db", 
"t1", false, false);
+        CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl", 
"test_db", "t1");
+        AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl", 
"test_db", true);
+        AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl", 
"test_db", false);
+
+        List<MetastoreEvent> mergedEvents;
+        List<MetastoreEvent> testEvents = Lists.newLinkedList();
+
+        testEvents.add(e1);
+        testEvents.add(e2);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 1);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e3);
+        testEvents.add(e9);
+        testEvents.add(e10);
+        testEvents.add(e4);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+        // because e5 is a rename event, it will not be merged
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e5);
+        testEvents.add(e4);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e6);
+        testEvents.add(e4);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e4);
+        testEvents.add(e11);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 2);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e4);
+        testEvents.add(e8);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 2);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L);
+
+        // because e5 is a rename event, it will not be merged
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e5);
+        testEvents.add(e8);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e12);
+        testEvents.add(e4);
+        testEvents.add(e7);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 2);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L);
+
+        // because e5 is a rename event, it will not be merged
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e5);
+        testEvents.add(e7);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e4);
+        testEvents.add(e13);
+        testEvents.add(e7);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 4);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L);
+        Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L);
+
+        testEvents.clear();
+        testEvents.add(e1);
+        testEvents.add(e2);
+        testEvents.add(e10);
+        testEvents.add(e4);
+        testEvents.add(e14);
+        testEvents.add(e7);
+        mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+        Assertions.assertTrue(mergedEvents.size() == 3);
+        Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+        Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L);
+        Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to