AndrewJSchofield commented on code in PR #19192:
URL: https://github.com/apache/kafka/pull/19192#discussion_r2009142578
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -954,10 +1046,7 @@ public void testShareFetchWithSubscriptionChange() {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(0L, AcknowledgeType.ACCEPT);
- acknowledgements.add(1L, AcknowledgeType.RELEASE);
- acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was ACCEPT, RELEASE, ACCEPT before this patch. Is this change
intentional?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1004,10 +1090,7 @@ public void
testShareFetchWithSubscriptionChangeMultipleNodes() {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(0L, AcknowledgeType.ACCEPT);
- acknowledgements.add(1L, AcknowledgeType.RELEASE);
- acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements = getAcknowledgements(0,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was ACCEPT, RELEASE, ACCEPT before this patch. Is this change
intentional?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1223,21 +1343,15 @@ public void
testAcknowledgementCommitCallbackMultiplePartitionCommitAsync() {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(0L, AcknowledgeType.ACCEPT);
- acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was ACCEPT, ACCEPT, ACCEPT before this patch.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -457,9 +453,7 @@ public void testAcknowledgeOnClose() {
shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
// Remaining acknowledgements sent with close().
- Acknowledgements acknowledgements2 = Acknowledgements.empty();
- acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
- acknowledgements2.add(3L, AcknowledgeType.REJECT);
+ Acknowledgements acknowledgements2 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was formerly ACCEPT, REJECT. Is the additional acknowledgement
intentional?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -652,19 +633,15 @@ public void testBatchingAcknowledgeRequestStates() {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- acknowledgements.add(2L, AcknowledgeType.ACCEPT);
- acknowledgements.add(3L, AcknowledgeType.REJECT);
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.commitAsync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
+ shareConsumeRequestManager.commitAsync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)),
+ calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
- Acknowledgements acknowledgements2 = Acknowledgements.empty();
- acknowledgements.add(4L, AcknowledgeType.ACCEPT);
- acknowledgements.add(5L, AcknowledgeType.ACCEPT);
- acknowledgements.add(6L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements2 = getAcknowledgements(4,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was formerly ACCEPT, ACCEPT, ACCEPT. Is this change intentional?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1223,21 +1343,15 @@ public void
testAcknowledgementCommitCallbackMultiplePartitionCommitAsync() {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(0L, AcknowledgeType.ACCEPT);
- acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- Acknowledgements acknowledgements2 = Acknowledgements.empty();
- acknowledgements2.add(0L, AcknowledgeType.ACCEPT);
- acknowledgements2.add(1L, AcknowledgeType.ACCEPT);
- acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
+ Acknowledgements acknowledgements2 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Review Comment:
This was ACCEPT, ACCEPT, ACCEPT before this patch.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1242,6 +1271,10 @@ void processingComplete() {
processPendingInFlightAcknowledgements(new
InvalidRecordStateException(INVALID_RESPONSE));
resultHandler.completeIfEmpty();
isProcessed = true;
+ maybeResetTimerAndRequestState();
+ if (requestType == AcknowledgeRequestType.CLOSE) {
Review Comment:
It seems to me that this setting the close request to null should really not
be in this class. The management of the set of acknowledge request states
belongs in the request manager, not here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]