FrankYang0529 commented on code in PR #19669:
URL: https://github.com/apache/kafka/pull/19669#discussion_r2083217276
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2253,44 +2262,49 @@ public void testPollAuthenticationFailure(GroupProtocol
groupProtocol) throws In
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testOffsetsForTimesAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCommitSyncAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(tp0, new OffsetAndMetadata(10L));
- assertThrows(AuthenticationException.class, () ->
consumer.commitSync(offsets));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(10L));
+ assertThrows(AuthenticationException.class, () ->
consumer.commitSync(offsets));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCommittedAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.committed(Collections.singleton(tp0)).get(tp0));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.committed(Collections.singleton(tp0)).get(tp0));
+ }
}
@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testMeasureCommitSyncDurationOnFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer
- = consumerWithPendingError(groupProtocol, new
MockTime(Duration.ofSeconds(1).toMillis()));
+ try (final KafkaConsumer<String, String> consumer
+ = consumerWithPendingError(groupProtocol, new
MockTime(Duration.ofSeconds(1).toMillis()))) {
- try {
- consumer.commitSync(Collections.singletonMap(tp0, new
OffsetAndMetadata(10L)));
- } catch (final RuntimeException e) {
- }
+ try {
+ consumer.commitSync(Collections.singletonMap(tp0, new
OffsetAndMetadata(10L)));
Review Comment:
```suggestion
consumer.commitSync(Map.of(tp0, new OffsetAndMetadata(10L)));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1959,17 +1963,19 @@ public void testCloseShouldBeIdempotent(GroupProtocol
groupProtocol) {
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, false, groupInstanceId);
- consumer.close(Duration.ZERO);
- consumer.close(Duration.ZERO);
+ consumer.close(CloseOptions.timeout(Duration.ZERO));
+ consumer.close(CloseOptions.timeout(Duration.ZERO));
// verify that the call is idempotent by checking that the network
client is only closed once.
verify(client).close();
}
+ @SuppressWarnings("resource")
Review Comment:
It looks like this is redundant.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -429,13 +430,13 @@ public void
testSecondPollWithDeserializationErrorThrowsRecordDeserializationExc
assertEquals(invalidRecordNumber - 1, records.records(tp0).size());
long lastOffset = records.records(tp0).get(records.records(tp0).size()
- 1).offset();
assertEquals(invalidRecordNumber - 2, lastOffset);
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(lastOffset + 1, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(lastOffset + 1, Optional.empty(),
""), records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(lastOffset + 1),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1016,8 +1017,8 @@ public void
testFetchProgressWithMissingPartitionPosition(GroupProtocol groupPro
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
assertEquals(singleton(tp0), records.partitions());
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(records.records(tp0).get(records.count() - 1).offset() + 1,
Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new
OffsetAndMetadata(records.records(tp0).get(records.count() - 1).offset() + 1,
Optional.empty(), ""), records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new
OffsetAndMetadata(records.records(tp0).get(records.count() - 1).offset() + 1),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -404,11 +405,11 @@ public void testPollReturnsRecords(GroupProtocol
groupProtocol) {
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ZERO);
- assertEquals(records.count(), 5);
- assertEquals(records.partitions(), Collections.singleton(tp0));
- assertEquals(records.records(tp0).size(), 5);
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(5,
Optional.empty(), ""));
+ assertEquals(5, records.count());
+ assertEquals(Collections.singleton(tp0), records.partitions());
Review Comment:
```suggestion
assertEquals(singleton(tp0), records.partitions());
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1382,8 +1386,8 @@ public void
testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ZERO);
assertEquals(5, records.count());
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(5,
Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(5, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(5),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2445,9 +2461,9 @@ public void
testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
assertEquals(11, records.count());
assertEquals(1L, consumer.position(tp0));
assertEquals(10L, consumer.position(t2p0));
- assertEquals(records.nextOffsets().size(), 2);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(1L,
Optional.empty(), ""));
- assertEquals(records.nextOffsets().get(t2p0), new
OffsetAndMetadata(10L, Optional.empty(), ""));
+ assertEquals(2, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(1L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
+ assertEquals(new OffsetAndMetadata(10L, Optional.empty(), ""),
records.nextOffsets().get(t2p0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(1L),
records.nextOffsets().get(tp0));
assertEquals(new OffsetAndMetadata(10L),
records.nextOffsets().get(t2p0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2539,13 +2555,13 @@ public void
testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
assertEquals(5L, consumer.position(tp0));
assertEquals(100L, consumer.position(t3p0));
- assertEquals(recs2.get().nextOffsets().size(), 2);
- assertEquals(recs2.get().nextOffsets().get(tp0), new
OffsetAndMetadata(5L, Optional.empty(), ""));
- assertEquals(recs2.get().nextOffsets().get(t3p0), new
OffsetAndMetadata(100L, Optional.empty(), ""));
+ assertEquals(2, recs2.get().nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(5L, Optional.empty(), ""),
recs2.get().nextOffsets().get(tp0));
+ assertEquals(new OffsetAndMetadata(100L, Optional.empty(), ""),
recs2.get().nextOffsets().get(t3p0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(5L),
recs2.get().nextOffsets().get(tp0));
assertEquals(new OffsetAndMetadata(100L),
recs2.get().nextOffsets().get(t3p0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2498,8 +2514,8 @@ public void
testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
assertEquals(Collections.singleton(tp0), consumer.assignment());
assertEquals(1, records.count());
assertEquals(3L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(3L,
Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(3L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(3L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -404,11 +405,11 @@ public void testPollReturnsRecords(GroupProtocol
groupProtocol) {
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ZERO);
- assertEquals(records.count(), 5);
- assertEquals(records.partitions(), Collections.singleton(tp0));
- assertEquals(records.records(tp0).size(), 5);
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(5,
Optional.empty(), ""));
+ assertEquals(5, records.count());
+ assertEquals(Collections.singleton(tp0), records.partitions());
+ assertEquals(5, records.records(tp0).size(), 5);
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(5, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(5),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -913,16 +914,16 @@ public void
verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol g
consumer.assign(singleton(tp0));
consumer.seekToBeginning(singleton(tp0));
- // there shouldn't be any need to lookup the coordinator or fetch
committed offsets.
- // we just lookup the starting position and send the record fetch.
+ // there shouldn't be any need to look up the coordinator or fetch
committed offsets.
+ // we just look up the starting position and send the record fetch.
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
client.prepareResponse(fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(55L, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(55L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(55L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1543,9 +1547,9 @@ public void
testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProt
assertEquals(101, records.count());
assertEquals(2L, consumer.position(tp0));
assertEquals(100L, consumer.position(t3p0));
- assertEquals(records.nextOffsets().size(), 2);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(2,
Optional.empty(), ""));
- assertEquals(records.nextOffsets().get(t3p0), new
OffsetAndMetadata(100, Optional.empty(), ""));
+ assertEquals(2, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(2, Optional.empty(), ""),
records.nextOffsets().get(tp0));
+ assertEquals(new OffsetAndMetadata(100, Optional.empty(), ""),
records.nextOffsets().get(t3p0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(2),
records.nextOffsets().get(tp0));
assertEquals(new OffsetAndMetadata(100),
records.nextOffsets().get(t3p0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2253,44 +2262,49 @@ public void testPollAuthenticationFailure(GroupProtocol
groupProtocol) throws In
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testOffsetsForTimesAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCommitSyncAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(tp0, new OffsetAndMetadata(10L));
- assertThrows(AuthenticationException.class, () ->
consumer.commitSync(offsets));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(10L));
+ assertThrows(AuthenticationException.class, () ->
consumer.commitSync(offsets));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCommittedAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.committed(Collections.singleton(tp0)).get(tp0));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.committed(Collections.singleton(tp0)).get(tp0));
Review Comment:
```suggestion
assertThrows(AuthenticationException.class, () ->
consumer.committed(singleton(tp0)).get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -3513,6 +3533,7 @@ public void testEmptyGroupId(GroupProtocol groupProtocol)
{
assertEquals("The configured group.id should not be an empty string or
whitespace.", e.getCause().getMessage());
}
+ @SuppressWarnings("resource")
Review Comment:
It looks like this is redundant.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2253,44 +2262,49 @@ public void testPollAuthenticationFailure(GroupProtocol
groupProtocol) throws In
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testOffsetsForTimesAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L)));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCommitSyncAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(tp0, new OffsetAndMetadata(10L));
- assertThrows(AuthenticationException.class, () ->
consumer.commitSync(offsets));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(10L));
Review Comment:
```suggestion
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
tp0, new OffsetAndMetadata(10L)
);
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -3505,6 +3524,7 @@ public void
testClientInstanceIdNoTelemetryReporterRegistered(GroupProtocol grou
assertEquals("Telemetry is not enabled. Set config
`enable.metrics.push` to `true`.", exception.getMessage());
}
+ @SuppressWarnings("resource")
Review Comment:
It looks like this is redundant.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1723,19 +1727,19 @@ public void
testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupP
assertEquals(0,
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
// verify that assignment immediately changes
- assertEquals(consumer.assignment(), singleton(tp0));
+ assertEquals(singleton(tp0), consumer.assignment());
- // there shouldn't be any need to lookup the coordinator or fetch
committed offsets.
- // we just lookup the starting position and send the record fetch.
+ // there shouldn't be any need to look up the coordinator or fetch
committed offsets.
+ // we just look up the starting position and send the record fetch.
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(100));
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(11L, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(11L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(11L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -954,8 +955,8 @@ public void
verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit(GroupPr
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(0));
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(55L, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(55L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(55L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1782,29 +1786,29 @@ public void
testManualAssignmentChangeWithAutoCommitDisabled(GroupProtocol group
assertEquals(0,
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
// verify that assignment immediately changes
- assertEquals(consumer.assignment(), singleton(tp0));
+ assertEquals(singleton(tp0), consumer.assignment());
- // there shouldn't be any need to lookup the coordinator or fetch
committed offsets.
- // we just lookup the starting position and send the record fetch.
+ // there shouldn't be any need to look up the coordinator or fetch
committed offsets.
+ // we just look up the starting position and send the record fetch.
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(1));
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(11L, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(11L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(11L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2209,26 +2215,29 @@ public void
testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) {
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testPartitionsForAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.partitionsFor("some other topic"));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.partitionsFor("some other topic"));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testBeginningOffsetsAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.beginningOffsets(Collections.singleton(tp0)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.beginningOffsets(Collections.singleton(tp0)));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testEndOffsetsAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.endOffsets(Collections.singleton(tp0)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.endOffsets(Collections.singleton(tp0)));
Review Comment:
```suggestion
assertThrows(AuthenticationException.class, () ->
consumer.endOffsets(singleton(tp0)));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2480,8 +2496,8 @@ public void
testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
assertEquals(Collections.singleton(tp0), consumer.assignment());
assertEquals(1, records.count());
assertEquals(2L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new OffsetAndMetadata(2L,
Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(2L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(2L),
records.nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2521,8 +2537,8 @@ public void
testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
assertEquals(Set.of(tp0, t3p0), consumer.assignment());
assertEquals(4L, consumer.position(tp0));
assertEquals(0L, consumer.position(t3p0));
- assertEquals(recs1.get().nextOffsets().size(), 1);
- assertEquals(recs1.get().nextOffsets().get(tp0), new
OffsetAndMetadata(4L, Optional.empty(), ""));
+ assertEquals(1, recs1.get().nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(4L, Optional.empty(), ""),
recs1.get().nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(4L),
recs1.get().nextOffsets().get(tp0));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2209,26 +2215,29 @@ public void
testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) {
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testPartitionsForAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.partitionsFor("some other topic"));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.partitionsFor("some other topic"));
+ }
}
// TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
// The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testBeginningOffsetsAuthenticationFailure(GroupProtocol
groupProtocol) {
- final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol);
- assertThrows(AuthenticationException.class, () ->
consumer.beginningOffsets(Collections.singleton(tp0)));
+ try (final KafkaConsumer<String, String> consumer =
consumerWithPendingAuthenticationError(groupProtocol)) {
+ assertThrows(AuthenticationException.class, () ->
consumer.beginningOffsets(Collections.singleton(tp0)));
Review Comment:
```suggestion
assertThrows(AuthenticationException.class, () ->
consumer.beginningOffsets(singleton(tp0)));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -2674,8 +2690,8 @@ public void testCurrentLag(GroupProtocol groupProtocol)
throws InterruptedExcept
final ConsumerRecords<String, String> records =
(ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
- assertEquals(records.nextOffsets().size(), 1);
- assertEquals(records.nextOffsets().get(tp0), new
OffsetAndMetadata(55L, Optional.empty(), ""));
+ assertEquals(1, records.nextOffsets().size());
+ assertEquals(new OffsetAndMetadata(55L, Optional.empty(), ""),
records.nextOffsets().get(tp0));
Review Comment:
```suggestion
assertEquals(new OffsetAndMetadata(55L),
records.nextOffsets().get(tp0));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]