kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490
##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java:
##########
@@ -74,7 +74,7 @@ public Properties topicConfig() {
public void
maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition>
topicPartitions) {
JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition
-> {
List<BrokerLocalStorage> localStorages =
JavaConverters.bufferAsJavaList(brokers()).stream()
- .map(b -> new BrokerLocalStorage(b.config().brokerId(),
b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+ .map(b -> new BrokerLocalStorage(b.config().brokerId(),
JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))
Review Comment:
ditto
##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -31,31 +31,36 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class BrokerLocalStorage {
private final Integer brokerId;
- private final File brokerStorageDirectory;
+ private final Set<File> brokerStorageDirectorys;
Review Comment:
nit: `brokerStorageDirectorys` -> `brokerStorageDirectories`
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3287,11 +3287,9 @@ class ReplicaManagerTest {
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
if (enableRemoteStorage) {
Review Comment:
nit: do we need this `if` check?
##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -141,7 +146,11 @@ private boolean
isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition
if (offsetToSearch.equals(firstLogFileBaseOffset)) {
return true;
}
- File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(),
topicPartition.toString());
+ File partitionDir = brokerStorageDirectorys.stream()
+ .filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the
topic-partition %s " +
+ "was not found", brokerId, topicPartition)));
Review Comment:
previously, we were returning the `partitionDir` instead of `logDir`:
```suggestion
File logDir = brokerStorageDirectorys.stream()
.filter(dir -> dirContainsTopicPartition(topicPartition,
dir))
.findFirst()
.orElseThrow(() -> new
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the
topic-partition %s " +
"was not found", brokerId, topicPartition)));
File partitionDir = new File(logDir.getAbsolutePath(),
topicPartition.toString());
```
##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -31,31 +31,36 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class BrokerLocalStorage {
private final Integer brokerId;
- private final File brokerStorageDirectory;
+ private final Set<File> brokerStorageDirectorys;
private final Integer storageWaitTimeoutSec;
private final int storagePollPeriodSec = 1;
private final Time time = Time.SYSTEM;
public BrokerLocalStorage(Integer brokerId,
- String storageDirname,
+ Set<String> storageDirnames,
Integer storageWaitTimeoutSec) {
this.brokerId = brokerId;
- this.brokerStorageDirectory = new File(storageDirname);
+ this.brokerStorageDirectorys =
storageDirnames.stream().map(File::new).collect(Collectors.toSet());
this.storageWaitTimeoutSec = storageWaitTimeoutSec;
}
public Integer getBrokerId() {
return brokerId;
}
+ public Set<File> getBrokerStorageDirectory() {
Review Comment:
rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories`
##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String
topic,
return this;
}
+ public TieredStorageTestBuilder alterLogDir(String topic,
+ Integer partition,
Review Comment:
nit: parameter alignment
##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -154,7 +154,7 @@ public static List<LocalTieredStorage>
remoteStorageManagers(Seq<KafkaBroker> br
@SuppressWarnings("deprecation")
public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker>
brokers) {
return JavaConverters.seqAsJavaList(brokers).stream()
- .map(b -> new BrokerLocalStorage(b.config().brokerId(),
b.config().logDirs().head(),
+ .map(b -> new BrokerLocalStorage(b.config().brokerId(),
JavaConverters.asJava(b.config().logDirs().toSet()),
Review Comment:
The build will fail to compile with scala 2.12 when `JavaConverters.asJava`
is used:
```
./gradlew clean :storage:build -PscalaVersion=2.12
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]