lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1500478736
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -232,13 +232,22 @@ public MembershipManager membershipManager() {
* are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
* responsive to changes.
*
+ * Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
* <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread
remains responsive.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
- boolean heartbeatNow = membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight();
- return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+ pollTimer.update(currentTimeMs);
+ if (
+ pollTimer.isExpired() ||
+ (membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight())
+ ) {
+ return 0L;
+ }
+ return Math.min(pollTimer.remainingMs() / 2,
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
Review Comment:
It's somewhat arbitrary. We need to make sure that the application thread
doesn't block so long that the poll timer expires. We want to let it unblock
sometime before the timer expires, and send an event to the background thread
that it's still polling, and give the background thread time to process the
event and reset the poll timer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]