This is an automated email from the ASF dual-hosted git repository.
jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 85a8de09a9 [#10658] Improvement(core) : preserve original exception
when EventBus listener fails (#10665)
85a8de09a9 is described below
commit 85a8de09a9e44f39fa7b8b2f2cbcf10f895b2dce
Author: Babu Mahesh <[email protected]>
AuthorDate: Wed Apr 8 11:38:43 2026 +0530
[#10658] Improvement(core) : preserve original exception when EventBus
listener fails (#10665)
## What changes were proposed in this pull request?
Enhanced EventBus.dispatchEvent() to automatically detect and handle
FailureEvent instances with safe exception swallowing. The method now
uses type-based dispatch logic:
1. PreEvents: May throw ForbiddenException for authorization control
2. FailureEvents: Automatically swallows ALL listener exceptions to
preserve the original error
3. Regular Events: Propagates listener exceptions normally
## Why are the changes needed?
When `dispatcher.removeGroup()` (or any other access control operation)
throws an exception, the code enters a catch block and dispatches a
`FailureEvent`. If `eventBus.dispatchEvent(FailureEvent)` also throws
because
a registered listener fails while handling that failure event, the
listener exception replaces the original exception. This hides the real
root cause from callers and makes production failures harder to
diagnose.
Now; listener exceptions of FailureEvent are caught and logged without
masking the original exception that triggered the failure.
Fixes: #10658
## How was this patch tested?
- Added `testRemoveGroupFailureEventDoesNotMaskOriginalException()` in
`TestGroupEvent` to verify group operations and the original exception
is propagated (not masked by listener exceptions)
- Added tests to verify exception-handling semantics across event types:
- `FailureEvent`: Listener exceptions (e.g., `RuntimeException`) are
caught, logged, and swallowed to avoid masking the original failure.
- `PreEvent`: Listener exceptions (eg : `ForbiddenException`) are
propagated, preserving authorization and control-flow behavior.
- Regular (success) `Event`: Listener exceptions are propagated normally
and not swallowed.
---
.../org/apache/gravitino/listener/EventBus.java | 52 ++++++++++++-
.../apache/gravitino/listener/TestEventBus.java | 91 ++++++++++++++++++++++
.../listener/api/event/TestGroupEvent.java | 59 ++++++++++++++
3 files changed, 199 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/listener/EventBus.java
b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
index 807b797560..eb5b49cdfc 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventBus.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
@@ -28,8 +28,11 @@ import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.BaseEvent;
import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
import org.apache.gravitino.listener.api.event.PreEvent;
import org.apache.gravitino.listener.api.event.SupportsChangingPreEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The {@code EventBus} class serves as a mechanism to dispatch events to
registered listeners. It
@@ -37,6 +40,8 @@ import
org.apache.gravitino.listener.api.event.SupportsChangingPreEvent;
* within its internal management.
*/
public class EventBus {
+ private static final Logger LOG = LoggerFactory.getLogger(EventBus.class);
+
/**
* Holds all instances of {@link EventListenerPlugin}. These instances can
either be {@link
* EventListenerPluginWrapper} which are used for synchronous event process,
or {@link
@@ -69,14 +74,32 @@ public class EventBus {
* Dispatches an event to all registered listeners. Each listener processes
the event based on its
* implementation, which could be either synchronous or asynchronous.
*
- * @param baseEvent The event to be dispatched to all registered listeners.
- * @return an Optional containing the transformed pre-event if it implements
{@link
- * SupportsChangingPreEvent}, otherwise {@link Optional#empty() empty}
+ * <p>This method applies different exception-handling semantics depending
on the event type:
+ *
+ * <ul>
+ * <li>{@link PreEvent}: May throw {@link ForbiddenException} to prevent
the operation (e.g.,
+ * for authorization or validation failures).
+ * <li>{@link FailureEvent}: All listener exceptions are caught, logged,
and swallowed to avoid
+ * masking the original failure.
+ * <li>{@link Event} (success events): Listener exceptions are propagated
to the caller.
+ * </ul>
+ *
+ * @param baseEvent the event to dispatch to all registered listeners; must
not be null
+ * @return an {@link Optional} containing the transformed pre-event if it
implements {@link
+ * SupportsChangingPreEvent}, otherwise {@link Optional#empty()}
+ * @throws ForbiddenException if a synchronous pre-event listener blocks the
operation
+ * @throws RuntimeException if an unknown event type is encountered or a
listener throws an
+ * exception for non-failure events
*/
public Optional<BaseEvent> dispatchEvent(BaseEvent baseEvent) {
Preconditions.checkNotNull(baseEvent, "baseEvent cannot be null");
if (baseEvent instanceof PreEvent) {
return dispatchAndTransformPreEvent((PreEvent) baseEvent);
+ } else if (baseEvent instanceof FailureEvent) {
+ // FailureEvents get special "safe" handling - swallow all exceptions to
prevent
+ // masking the original error that triggered this failure event
+ dispatchFailureEvent((Event) baseEvent);
+ return Optional.empty();
} else if (baseEvent instanceof Event) {
dispatchPostEvent((Event) baseEvent);
return Optional.empty();
@@ -105,6 +128,29 @@ public class EventBus {
eventListeners.forEach(eventListener ->
eventListener.onPostEvent(postEvent));
}
+ /**
+ * Dispatches a failure failureEvent to listeners, swallowing all exceptions
to preserve the
+ * original error.
+ *
+ * <p>When the primary operation fails, we want to notify listeners, but
listener exceptions must
+ * NOT propagate and mask the original failure. This method catches and logs
all exceptions
+ * (including {@link RuntimeException}, {@link Error}, etc.) to ensure the
original exception can
+ * be properly thrown to the caller.
+ *
+ * @param failureEvent the failure event to dispatch
+ */
+ private void dispatchFailureEvent(Event failureEvent) {
+ try {
+ eventListeners.forEach(eventListener ->
eventListener.onPostEvent(failureEvent));
+ } catch (Exception e) {
+ // Swallow ALL listener exceptions to prevent masking the original error
+ LOG.error(
+ "Failed to dispatch failure event: {}, ignoring exception to
preserve original error",
+ failureEvent.getClass().getSimpleName(),
+ e);
+ }
+ }
+
private Optional<BaseEvent> dispatchAndTransformPreEvent(PreEvent
originalEvent)
throws ForbiddenException {
boolean supportsChangePreEvent = originalEvent instanceof
SupportsChangingPreEvent;
diff --git a/core/src/test/java/org/apache/gravitino/listener/TestEventBus.java
b/core/src/test/java/org/apache/gravitino/listener/TestEventBus.java
index 1b8aa5d935..cd8c5ac111 100644
--- a/core/src/test/java/org/apache/gravitino/listener/TestEventBus.java
+++ b/core/src/test/java/org/apache/gravitino/listener/TestEventBus.java
@@ -18,7 +18,16 @@
*/
package org.apache.gravitino.listener;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.ForbiddenException;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+import org.apache.gravitino.listener.api.event.OperationStatus;
+import org.apache.gravitino.listener.api.event.PreEvent;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -29,4 +38,86 @@ public class TestEventBus {
EventBus eventBus = new EventBus(Collections.emptyList());
Assertions.assertThrows(NullPointerException.class, () ->
eventBus.dispatchEvent(null));
}
+
+ // Test that dispatchEvent() automatically swallows exceptions for
FailureEvents
+ @Test
+ void testDispatchEventSwallowsExceptionsForFailureEvents() {
+ ThrowingEventListener listener =
+ new ThrowingEventListener(new RuntimeException("Listener bug"));
+ EventBus eventBus = new EventBus(Arrays.asList(listener));
+
+ Event failureEvent =
+ new FailureEvent("user", NameIdentifier.of("test"), new
Exception("Original error")) {
+ @Override
+ public OperationStatus operationStatus() {
+ return OperationStatus.SUCCESS;
+ }
+ };
+ // RuntimeException should be swallowed and logged
+ Assertions.assertDoesNotThrow(() -> eventBus.dispatchEvent(failureEvent));
+ }
+
+ // Test that dispatchEvent propagates ForbiddenException for PreEvents
+ @Test
+ void testDispatchEventPropagatesForbiddenExceptionForPreEvents() {
+ ThrowingEventListener listener =
+ new ThrowingEventListener(new ForbiddenException("Access denied"));
+ EventBus eventBus = new EventBus(Arrays.asList(listener));
+
+ PreEvent preEvent =
+ new PreEvent("user", NameIdentifier.of("test")) {
+ @Override
+ public OperationStatus operationStatus() {
+ return OperationStatus.SUCCESS;
+ }
+ };
+ // ForbiddenException should be propagated
+ Assertions.assertThrows(ForbiddenException.class, () ->
eventBus.dispatchEvent(preEvent));
+ }
+
+ // Test that dispatchEvent propagates exceptions for regular (non-failure)
events
+ @Test
+ void testDispatchEventPropagatesExceptionsForRegularEvents() {
+ ThrowingEventListener listener =
+ new ThrowingEventListener(new RuntimeException("Listener error"));
+ EventBus eventBus = new EventBus(Arrays.asList(listener));
+
+ Event postEvent =
+ new Event("user", NameIdentifier.of("test")) {
+ @Override
+ public OperationStatus operationStatus() {
+ return OperationStatus.SUCCESS;
+ }
+ };
+
+ // RuntimeException should be propagated for regular events
+ Assertions.assertThrows(RuntimeException.class, () ->
eventBus.dispatchEvent(postEvent));
+ }
+
+ static class ThrowingEventListener implements EventListenerPlugin {
+ private final RuntimeException exceptionToThrow;
+
+ ThrowingEventListener(RuntimeException exceptionToThrow) {
+ this.exceptionToThrow = exceptionToThrow;
+ }
+
+ @Override
+ public void init(Map<String, String> properties) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void onPreEvent(PreEvent preEvent) {
+ throw exceptionToThrow;
+ }
+
+ @Override
+ public void onPostEvent(Event postEvent) {
+ throw exceptionToThrow;
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestGroupEvent.java
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestGroupEvent.java
index e3f0f5545e..b69a0dcae9 100644
---
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestGroupEvent.java
+++
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestGroupEvent.java
@@ -25,7 +25,9 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.Group;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
@@ -33,6 +35,7 @@ import
org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.listener.AccessControlEventDispatcher;
import org.apache.gravitino.listener.DummyEventListener;
import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.info.GroupInfo;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.junit.jupiter.api.Assertions;
@@ -362,6 +365,62 @@ public class TestGroupEvent {
Assertions.assertEquals(groupName, removeGroupFailureEvent.groupName());
}
+ @Test
+ void testRemoveGroupFailureEventDoesNotMaskOriginalException() {
+ GravitinoRuntimeException originalException =
+ new GravitinoRuntimeException("Original remove group failure");
+ GravitinoRuntimeException listenerException =
+ new GravitinoRuntimeException("Listener failed while handling failure
event");
+
+ AccessControlDispatcher exceptionDispatcher =
mock(AccessControlDispatcher.class);
+ when(exceptionDispatcher.removeGroup(METALAKE,
groupName)).thenThrow(originalException);
+
+ // Create a listener that throws when handling failure events
+ AccessControlEventDispatcher dispatcherWithThrowingListener =
+ createExceptionEventDispatcher(listenerException, exceptionDispatcher);
+
+ // The original exception should be propagated, not the listener's
exception
+ GravitinoRuntimeException thrownException =
+ Assertions.assertThrows(
+ GravitinoRuntimeException.class,
+ () -> dispatcherWithThrowingListener.removeGroup(METALAKE,
groupName));
+
+ // Verify it's the original exception, not the listener's exception
+ Assertions.assertSame(originalException, thrownException);
+ }
+
+ private static AccessControlEventDispatcher createExceptionEventDispatcher(
+ GravitinoRuntimeException listenerException, AccessControlDispatcher
exceptionDispatcher) {
+ EventListenerPlugin throwingListener =
+ new EventListenerPlugin() {
+ @Override
+ public void init(Map<String, String> properties) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void onPostEvent(Event event) {
+ if (event instanceof RemoveGroupFailureEvent) {
+ throw listenerException;
+ }
+ }
+
+ @Override
+ public void onPreEvent(PreEvent preEvent) {}
+ };
+
+ // Create dispatcher with throwing listener
+ EventBus eventBusWithThrowingListener =
+ new EventBus(Collections.singletonList(throwingListener));
+ AccessControlEventDispatcher exceptionEventDispatcher =
+ new AccessControlEventDispatcher(eventBusWithThrowingListener,
exceptionDispatcher);
+ return exceptionEventDispatcher;
+ }
+
@Test
void testGrantRolesToGroupPreEvent() {
dispatcher.grantRolesToGroup(METALAKE, grantedRoles, groupName);