lianetm commented on code in PR #19917:
URL: https://github.com/apache/kafka/pull/19917#discussion_r2141084110
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -470,7 +475,19 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
}
}
- return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
+ return convert(fetchable);
+ }
+
+ private Map<Node, FetchSessionHandler.FetchRequestData> convert(Map<Node,
FetchSessionHandler.Builder> fetchable) {
+ Map<Node, FetchSessionHandler.FetchRequestData> map = new
HashMap<>(fetchable.size());
+
+ for (Map.Entry<Node, FetchSessionHandler.Builder> entry :
fetchable.entrySet()) {
+ Node node = entry.getKey();
+ FetchSessionHandler.FetchRequestData fetchRequestData =
entry.getValue().build();
+ map.put(node, fetchRequestData);
Review Comment:
```suggestion
map.put(entry.getKey(), entry.getValue().build());
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1135,7 +1134,9 @@ CompletableFuture<Void>
revokePartitions(Set<TopicPartition> partitionsToRevoke)
// Ensure the set of partitions to revoke are still assigned
Set<TopicPartition> revokedPartitions = new
HashSet<>(partitionsToRevoke);
revokedPartitions.retainAll(subscriptions.assignedPartitions());
- log.info("Revoking previously assigned partitions {}",
revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+
+ if (log.isInfoEnabled())
+ log.info("Revoking previously assigned partitions {}",
Utils.topicPartitionString(revokedPartitions));
Review Comment:
Here also I expect we can simplify and just print the set? Only difference
is that it will include the brackets, which is ok I expect.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) {
}
public List<CompletableEvent<?>> uncompletedEvents() {
- return tracked.stream()
- .filter(e -> !e.future().isDone())
- .collect(Collectors.toList());
+ List<CompletableEvent<?>> events = new ArrayList<>();
+
+ for (CompletableEvent<?> event : tracked) {
Review Comment:
Should we add a comment saying that we're intentionally using for loop over
the Streams API for perf, as this runs on every iteration of the background
thread? (I'm afraid folks will be tempted to change this simple loops back to
streams api)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -77,11 +81,14 @@ public Exception invokePartitionsAssigned(final
SortedSet<TopicPartition> assign
}
public Exception invokePartitionsRevoked(final SortedSet<TopicPartition>
revokedPartitions) {
- log.info("Revoke previously assigned partitions {}",
revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+ if (log.isInfoEnabled())
+ log.info("Revoke previously assigned partitions {}",
Utils.topicPartitionString(revokedPartitions));
+
Set<TopicPartition> revokePausedPartitions =
subscriptions.pausedPartitions();
revokePausedPartitions.retainAll(revokedPartitions);
- if (!revokePausedPartitions.isEmpty())
- log.info("The pause flag in partitions [{}] will be removed
due to revocation.",
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+
+ if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled())
+ log.info("The pause flag in partitions [{}] will be removed due to
revocation.", Utils.topicPartitionString(revokePausedPartitions));
Review Comment:
similar to above, if we are adding [] to the string of partitions, couldn't
we just print the set?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -310,7 +311,9 @@ private void process(final AssignmentChangeEvent event) {
manager.updateTimerAndMaybeCommit(event.currentTimeMs());
}
- log.info("Assigned to partition(s): {}",
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+ if (log.isInfoEnabled())
+ log.info("Assigned to partition(s): {}",
Utils.topicPartitionString(event.partitions()));
Review Comment:
ok with the change, even though this one is not on any hot-path I expect,
right? (just API calls to consumer.assign that do a single trip to the
background, no requests).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) {
}
public List<CompletableEvent<?>> uncompletedEvents() {
- return tracked.stream()
- .filter(e -> !e.future().isDone())
- .collect(Collectors.toList());
+ List<CompletableEvent<?>> events = new ArrayList<>();
+
+ for (CompletableEvent<?> event : tracked) {
+ if (!event.future().isDone())
+ events.add(event);
+ }
+
+ return events;
+ }
+
+ /**
+ * For all the {@link CompletableEvent}s in the collection, if they're not
already complete, invoke
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
+ *
+ * @param events Collection of objects, assumed to be subclasses of {@link
ApplicationEvent} or
+ * {@link BackgroundEvent}, but will only perform completion
for any
+ * unfinished {@link CompletableEvent}s
+ *
+ * @return Number of events closed
+ */
+ private long completeEventsExceptionallyOnClose(Collection<?> events) {
+ long count = 0;
+
+ for (Object o : events) {
+ if (!(o instanceof CompletableEvent))
+ continue;
+
+ CompletableEvent<?> event = (CompletableEvent<?>) o;
+
+ if (event.future().isDone())
+ continue;
+
+ TimeoutException error = new TimeoutException(String.format("%s
could not be completed before the consumer closed",
event.getClass().getSimpleName()));
+
+ if (event.future().completeExceptionally(error)) {
+ log.debug("Event {} completed exceptionally since the consumer
is closing", event);
+ count++;
Review Comment:
incrementing here is not exactly the same that it used to happen before,
right?
Before the PR, the count inc would happen after the `peek(expireEvent)`, so
it would count events that were expired but not completedExceptionally (the
ones going through line 202, where now we don't increment the count)
This count doesn't seem used anyways, but better to be accurate and
consistent with what it's done in the `reap` func (there the count includes all
expired events, no matter if we completed them exceptionally or not)
https://github.com/apache/kafka/blob/2b2552da1fbcdb687c99a03a81034c8f78e86871/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java#L115
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -414,8 +413,8 @@ public CompletableFuture<Void>
signalPartitionsRevoked(Set<TopicPartition> parti
private void logPausedPartitionsBeingRevoked(Set<TopicPartition>
partitionsToRevoke) {
Set<TopicPartition> revokePausedPartitions =
subscriptions.pausedPartitions();
revokePausedPartitions.retainAll(partitionsToRevoke);
- if (!revokePausedPartitions.isEmpty()) {
- log.info("The pause flag in partitions [{}] will be removed due to
revocation.",
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+ if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) {
+ log.info("The pause flag in partitions [{}] will be removed due to
revocation.", Utils.topicPartitionString(revokePausedPartitions));
Review Comment:
Given that er end up printing the string in between [], can't we simply
print the set directly?
```suggestion
log.info("The pause flag in partitions {} will be removed due to
revocation.", revokePausedPartitions);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -103,11 +114,14 @@ public Exception invokePartitionsRevoked(final
SortedSet<TopicPartition> revoked
}
public Exception invokePartitionsLost(final SortedSet<TopicPartition>
lostPartitions) {
- log.info("Lost previously assigned partitions {}",
lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+ if (log.isInfoEnabled())
+ log.info("Lost previously assigned partitions {}",
Utils.topicPartitionString(lostPartitions));
+
Set<TopicPartition> lostPausedPartitions =
subscriptions.pausedPartitions();
lostPausedPartitions.retainAll(lostPartitions);
- if (!lostPausedPartitions.isEmpty())
- log.info("The pause flag in partitions [{}] will be removed due to
partition lost.",
lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+
+ if (!lostPausedPartitions.isEmpty() && log.isInfoEnabled())
+ log.info("The pause flag in partitions [{}] will be removed due to
partition lost.", Utils.topicPartitionString(lostPartitions));
Review Comment:
ditto
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -362,12 +368,18 @@ void cleanup() {
* If there is a metadata error, complete all uncompleted events that
require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
- List<? extends CompletableApplicationEvent<?>>
subscriptionMetadataEvent = events.stream()
- .filter(e -> e instanceof CompletableApplicationEvent<?>)
- .map(e -> (CompletableApplicationEvent<?>) e)
-
.filter(CompletableApplicationEvent::requireSubscriptionMetadata)
- .collect(Collectors.toList());
-
+ List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new
ArrayList<>();
+
+ for (CompletableEvent<?> ce : events) {
+ if (!(ce instanceof CompletableApplicationEvent))
+ continue;
+
+ CompletableApplicationEvent<?> cae =
(CompletableApplicationEvent<?>) ce;
+
+ if (cae.requireSubscriptionMetadata())
+ subscriptionMetadataEvent.add(cae);
Review Comment:
maybe simplify ?
```suggestion
if (ce instanceof CompletableApplicationEvent &&
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) {
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]