mjsax commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1500151525
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -232,13 +232,22 @@ public MembershipManager membershipManager() {
* are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
* responsive to changes.
*
+ * Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
* <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread
remains responsive.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
- boolean heartbeatNow = membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight();
- return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+ pollTimer.update(currentTimeMs);
+ if (
+ pollTimer.isExpired() ||
+ (membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight())
+ ) {
+ return 0L;
+ }
+ return Math.min(pollTimer.remainingMs() / 2,
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
Review Comment:
For my own education: why `/ 2` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]