chia7712 commented on code in PR #20389:
URL: https://github.com/apache/kafka/pull/20389#discussion_r2301926271
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -784,7 +783,7 @@ public List<Object> validValues(String name, Map<String,
Object> parsedConfig) {
for (PluginDesc<T> plugin : plugins()) {
result.add(plugin.pluginClass());
}
- return Collections.unmodifiableList(result);
+ return List.copyOf(result);
Review Comment:
```java
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
return plugins().stream().map(p -> (Object)
p.pluginClass()).toList();
}
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##########
@@ -253,7 +252,7 @@ public MetricGroupId(String groupName, Map<String, String>
tags) {
Objects.requireNonNull(groupName);
Objects.requireNonNull(tags);
this.groupName = groupName;
- this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+ this.tags = Map.copyOf(new LinkedHashMap<>(tags));
Review Comment:
IIRC, `Map.copyOf` does not guarantee insertion order, so we should keep
using `Collections.unmodifiableMap` instead. By the way, it would be useful to
add comment to prevent unintended change in the future.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java:
##########
@@ -58,7 +57,7 @@ protected Collection<Class<?>> regularResources() {
@Override
protected Collection<Class<?>> adminResources() {
- return Collections.singletonList(
+ return List.of(
Review Comment:
```java
return List.of(LoggingResource.class);
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -137,7 +136,7 @@ public Future<Void> executeFailed(ProcessingContext<T>
context, Stage stage, Cla
// Visible for testing
synchronized Future<Void> report(ProcessingContext<T> context) {
if (reporters.size() == 1) {
- return new
WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(context)));
+ return new
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.iterator().next().report(context)));
Review Comment:
```java
return new
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.get(0).report(context)));
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java:
##########
@@ -134,11 +134,11 @@ private Map<String, ByteBuffer>
fillAssignmentsAndSerialize(Collection<String> m
for (String member : members) {
Collection<String> connectors = connectorAssignments.get(member);
Review Comment:
```java
Collection<String> connectors =
connectorAssignments.getOrDefault(member, List.of());
Collection<ConnectorTaskId> tasks =
taskAssignments.getOrDefault(member, List.of());
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -335,12 +333,12 @@ public static List<Path> pluginUrls(Path topPath) throws
IOException {
if (containsClassFiles) {
if (archives.isEmpty()) {
- return Collections.singletonList(topPath);
+ return List.of(topPath);
}
log.warn("Plugin path contains both java archives and class files.
Returning only the"
+ " archives");
}
- return Arrays.asList(archives.toArray(new Path[0]));
+ return List.of(archives.toArray(new Path[0]));
Review Comment:
```java
return List.copyOf(archives);
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##########
@@ -395,7 +394,7 @@ public ConnectMetricsRegistry(Set<String> tags) {
connectorStatusMetrics.put(connectorUnassignedTaskCount,
TaskStatus.State.UNASSIGNED);
connectorStatusMetrics.put(connectorDestroyedTaskCount,
TaskStatus.State.DESTROYED);
connectorStatusMetrics.put(connectorRestartingTaskCount,
TaskStatus.State.RESTARTING);
- connectorStatusMetrics =
Collections.unmodifiableMap(connectorStatusMetrics);
+ connectorStatusMetrics = Map.copyOf(connectorStatusMetrics);
Review Comment:
`connectorStatusMetrics` could be initialized with `Map.of`. Also, it could
be a "final" variable
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java:
##########
@@ -131,11 +131,11 @@ public void resume(TopicPartition... partitions) {
throw new IllegalWorkerStateException("SinkTaskContext may not be
used to resume consumption until the task is initialized");
}
try {
- pausedPartitions.removeAll(Arrays.asList(partitions));
+ List.of(partitions).forEach(pausedPartitions::remove);
Review Comment:
Could you please create a `List<TopicPartition>` at the begining and reuse
it in the log message? Otherwise, the log message will jst call `toString` on
the array object
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]