cadonna commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1574508879
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1333,32 +1342,41 @@ public void
testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName
private static Stream<Arguments> listenerCallbacksInvokeSource() {
Optional<RuntimeException> empty = Optional.empty();
Optional<RuntimeException> error = Optional.of(new
RuntimeException("Intentional error"));
+ Optional<RuntimeException> kafkaException = Optional.of(new
KafkaException("Intentional error"));
+ Optional<RuntimeException> wrappedException = Optional.of(new
KafkaException("User rebalance callback throws an error", error.get()));
return Stream.of(
// Tests if we don't have an event, the listener doesn't get
called.
- Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0,
0),
+ Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0,
0, empty),
// Tests if we get an event for a revocation, that we invoke our
listener.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED),
empty, empty, empty, 1, 0, 0),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED),
empty, empty, empty, 1, 0, 0, empty),
// Tests if we get an event for an assignment, that we invoke our
listener.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED),
empty, empty, empty, 0, 1, 0),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED),
empty, empty, empty, 0, 1, 0, empty),
// Tests that we invoke our listener even if it encounters an
exception.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty,
empty, empty, 0, 0, 1),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty,
empty, empty, 0, 0, 1, empty),
// Tests that we invoke our listener even if it encounters an
exception.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED),
error, empty, empty, 1, 0, 0),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED),
error, empty, empty, 1, 0, 0, wrappedException),
// Tests that we invoke our listener even if it encounters an
exception.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED),
empty, error, empty, 0, 1, 0),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED),
empty, error, empty, 0, 1, 0, wrappedException),
// Tests that we invoke our listener even if it encounters an
exception.
- Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty,
empty, error, 0, 0, 1),
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty,
empty, error, 0, 0, 1, wrappedException),
+
+ // Tests that we invoke our listener even if it encounters an
exception. Special case to test that a kafka exception is not wrapped.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty,
empty, kafkaException, 0, 0, 1, kafkaException),
Review Comment:
Don't you need to repeat this test also for partition assigned and partition
revoked?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]