kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608541319
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -144,6 +150,36 @@ void runOnce() {
.map(Optional::get)
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);
+
+ reapExpiredApplicationEvents(currentTimeMs);
+ }
+
+ /**
+ * Process the events—if any—that were produced by the application thread.
+ */
+ private void processApplicationEvents() {
+ LinkedList<ApplicationEvent> events = new LinkedList<>();
+ applicationEventQueue.drainTo(events);
+
+ for (ApplicationEvent event : events) {
+ try {
+ if (event instanceof CompletableApplicationEvent)
+
applicationEventReaper.add((CompletableApplicationEvent<?>) event);
+
+ applicationEventProcessor.process(event);
+ } catch (Throwable t) {
+ log.warn("Error processing event {}", t.getMessage(), t);
Review Comment:
Resolving this thread as there have been no further comments for some time.
Please un-resolve if there is further discussion needed.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##########
@@ -32,13 +29,9 @@ public abstract class CompletableApplicationEvent<T> extends
ApplicationEvent im
private final CompletableFuture<T> future;
private final long deadlineMs;
- protected CompletableApplicationEvent(final Type type, final Timer timer) {
- super(type);
- this.future = new CompletableFuture<>();
- Objects.requireNonNull(timer);
- this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
- }
-
+ /**
+ * <em>Note</em>: the {@code deadlineMs} is the future time of expiration,
<em>not</em> a timeout.
+ */
Review Comment:
Resolving this thread as there have been no further comments for some time.
Please un-resolve if there is further discussion needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]