This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16c079ed23e KAFKA-19525: Refactor TopicBasedRLMM implementation to
remove unused code (#20204)
16c079ed23e is described below
commit 16c079ed23e40ff66660515a0fb4a832bde5eaf8
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Jul 23 12:19:13 2025 +0530
KAFKA-19525: Refactor TopicBasedRLMM implementation to remove unused code
(#20204)
- startConsumerThread is always true so removed the variable.
- Replaced the repetitive lock handling logic with
`withReadLockAndEnsureInitialized` to reduce duplication and improve
readability.
- Consolidated the logic in `initializeResources` and. simplified method
arguments to better encapsulate configuration.
- Extracted common code and reduced the usage of global variables.
- Named the variables properly.
Tests:
- Existing UTs since this patch refactored the code.
Reviewers: PoAn Yang <[email protected]>
---
.../TopicBasedRemoteLogMetadataManager.java | 361 ++++++++-------------
.../storage/RemoteLogMetadataManagerTestUtils.java | 9 +-
.../storage/RemoteLogSegmentLifecycleTest.java | 1 -
...ogMetadataManagerMultipleSubscriptionsTest.java | 1 -
...icBasedRemoteLogMetadataManagerRestartTest.java | 1 -
.../TopicBasedRemoteLogMetadataManagerTest.java | 1 -
.../storage/RemoteLogMetadataManagerTest.java | 1 -
7 files changed, 142 insertions(+), 233 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 927cc47efde..91011b1d9c2 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -56,28 +56,25 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
/**
- * This is the {@link RemoteLogMetadataManager} implementation with storage as
an internal topic with name {@link
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as
an internal topic with name
+ * {@link
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
* This is used to publish and fetch {@link RemoteLogMetadata} for the
registered user topic partitions with
* {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an
instance of this class, and it subscribes
* to metadata updates for the registered user topic partitions.
*/
public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataManager {
private static final Logger log =
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+ private final Time time = Time.SYSTEM;
- private volatile boolean configured = false;
-
- // It indicates whether the close process of this instance is started or
not via #close() method.
- // Using AtomicBoolean instead of volatile as it may encounter
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
- // if the field is read but not updated in a spin loop like in
#initializeResources() method.
+ private final AtomicBoolean configured = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
- private final Time time = Time.SYSTEM;
- private final boolean startConsumerThread;
private Thread initializationThread;
private volatile ProducerManager producerManager;
@@ -85,112 +82,77 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
// This allows to gracefully close this instance using {@link #close()}
method while there are some pending or new
// requests calling different methods which use the resources like
producer/consumer managers.
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
- private RemotePartitionMetadataStore remotePartitionMetadataStore;
- private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
- private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final RemotePartitionMetadataStore remotePartitionMetadataStore;
private final Set<TopicIdPartition> pendingAssignPartitions =
Collections.synchronizedSet(new HashSet<>());
- private volatile boolean initializationFailed;
- private final Supplier<RemotePartitionMetadataStore>
remoteLogMetadataManagerSupplier;
- private final Function<Integer, RemoteLogMetadataTopicPartitioner>
remoteLogMetadataTopicPartitionerFunction;
+ private volatile boolean initializationFailed = false;
+ private final Function<Integer, RemoteLogMetadataTopicPartitioner>
partitionerFunction;
- /**
- * The default constructor delegates to the internal one, starting the
consumer thread and
- * supplying an instance of RemoteLogMetadataTopicPartitioner and
RemotePartitionMetadataStore by default.
- */
public TopicBasedRemoteLogMetadataManager() {
- this(true, RemoteLogMetadataTopicPartitioner::new,
RemotePartitionMetadataStore::new);
+ this(RemoteLogMetadataTopicPartitioner::new,
RemotePartitionMetadataStore::new);
}
- /**
- * Used in tests to dynamically configure the instance.
- */
- TopicBasedRemoteLogMetadataManager(boolean startConsumerThread,
Function<Integer, RemoteLogMetadataTopicPartitioner>
remoteLogMetadataTopicPartitionerFunction,
Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
- this.startConsumerThread = startConsumerThread;
- this.remoteLogMetadataManagerSupplier =
remoteLogMetadataManagerSupplier;
- this.remoteLogMetadataTopicPartitionerFunction =
remoteLogMetadataTopicPartitionerFunction;
+ TopicBasedRemoteLogMetadataManager(Function<Integer,
RemoteLogMetadataTopicPartitioner> partitionerFunction,
+ Supplier<RemotePartitionMetadataStore>
metadataStoreSupplier) {
+ this.partitionerFunction = partitionerFunction;
+ this.remotePartitionMetadataStore = metadataStoreSupplier.get();
}
+ /**
+ * Adds metadata for a remote log segment to the metadata store.
+ * The provided metadata must have the state {@code COPY_SEGMENT_STARTED}.
+ *
+ * @param remoteLogSegmentMetadata the metadata of the remote log segment
to be added; must not be null
+ * @return a {@link CompletableFuture} that completes once the metadata
has been published to the topic
+ * @throws RemoteStorageException if an error occurs while storing the
metadata
+ * @throws IllegalArgumentException if the state of the provided metadata
is not {@code COPY_SEGMENT_STARTED}
+ */
@Override
public CompletableFuture<Void>
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws RemoteStorageException {
Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
-
- // This allows gracefully rejecting the requests while closing of this
instance is in progress, which triggers
- // closing the producer/consumer manager instances.
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- // This method is allowed only to add remote log segment with the
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
- // but not to update the existing remote log segment metadata.
+ return withReadLockAndEnsureInitialized(() -> {
if (remoteLogSegmentMetadata.state() !=
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
throw new IllegalArgumentException(
"Given remoteLogSegmentMetadata should have state as "
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED
+ " but it contains state as: " +
remoteLogSegmentMetadata.state());
}
-
- // Publish the message to the topic.
- return
storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
- remoteLogSegmentMetadata);
- } finally {
- lock.readLock().unlock();
- }
+ return storeRemoteLogMetadata(remoteLogSegmentMetadata);
+ });
}
@Override
- public CompletableFuture<Void>
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
segmentMetadataUpdate)
+ public CompletableFuture<Void>
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
throws RemoteStorageException {
- Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate
can not be null");
-
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- // Callers should use addRemoteLogSegmentMetadata to add
RemoteLogSegmentMetadata with state as
- // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
- if (segmentMetadataUpdate.state() ==
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
- throw new IllegalArgumentException("Given
remoteLogSegmentMetadata should not have the state as: "
- +
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+ Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be
null");
+ return withReadLockAndEnsureInitialized(() -> {
+ if (metadataUpdate.state() ==
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+ throw new IllegalArgumentException("Given
remoteLogSegmentMetadataUpdate should not have the state as: "
+ + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
}
-
- // Publish the message to the topic.
- return
storeRemoteLogMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
segmentMetadataUpdate);
- } finally {
- lock.readLock().unlock();
- }
+ return storeRemoteLogMetadata(metadataUpdate);
+ });
}
@Override
- public CompletableFuture<Void>
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata
remotePartitionDeleteMetadata)
+ public CompletableFuture<Void>
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata deleteMetadata)
throws RemoteStorageException {
- Objects.requireNonNull(remotePartitionDeleteMetadata,
"remotePartitionDeleteMetadata can not be null");
-
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- return
storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(),
remotePartitionDeleteMetadata);
- } finally {
- lock.readLock().unlock();
- }
+ Objects.requireNonNull(deleteMetadata, "deleteMetadata can not be
null");
+ return withReadLockAndEnsureInitialized(
+ () -> storeRemoteLogMetadata(deleteMetadata));
}
/**
* Returns {@link CompletableFuture} which will complete only after
publishing of the given {@code remoteLogMetadata} into
* the remote log metadata topic and the internal consumer is caught up
until the produced record's offset.
*
- * @param topicIdPartition partition of the given remoteLogMetadata.
- * @param remoteLogMetadata RemoteLogMetadata to be stored.
+ * @param remoteLogMetadata RemoteLogMetadata to be stored.
* @return a future with acknowledge and potentially waiting also for
consumer to catch up.
* This ensures cache is synchronized with backing topic.
* @throws RemoteStorageException if there are any storage errors occur.
*/
- private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition
topicIdPartition,
- RemoteLogMetadata
remoteLogMetadata)
- throws RemoteStorageException {
- log.debug("Storing the partition: {} metadata: {}", topicIdPartition,
remoteLogMetadata);
+ private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata
remoteLogMetadata) throws RemoteStorageException {
+ log.debug("Storing the partition: {} metadata: {}",
remoteLogMetadata.topicIdPartition(), remoteLogMetadata);
try {
// Publish the message to the metadata topic.
CompletableFuture<RecordMetadata> produceFuture =
producerManager.publishMessage(remoteLogMetadata);
@@ -214,62 +176,33 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
@Override
public Optional<RemoteLogSegmentMetadata>
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int
epochForOffset,
- long
offset)
- throws RemoteStorageException {
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- return
remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset,
epochForOffset);
- } finally {
- lock.readLock().unlock();
- }
+ long
offset) throws RemoteStorageException {
+ return withReadLockAndEnsureInitialized(
+ () ->
remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset,
epochForOffset));
}
@Override
public Optional<Long> highestOffsetForEpoch(TopicIdPartition
topicIdPartition,
int leaderEpoch)
throws RemoteStorageException {
- lock.readLock().lock();
- try {
-
- ensureInitializedAndNotClosed();
-
- return
remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch);
- } finally {
- lock.readLock().unlock();
- }
-
+ return withReadLockAndEnsureInitialized(
+ () ->
remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch));
}
@Override
public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- return
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
- } finally {
- lock.readLock().unlock();
- }
+ return withReadLockAndEnsureInitialized(
+ () ->
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition));
}
@Override
public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
-
- return
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition,
leaderEpoch);
- } finally {
- lock.readLock().unlock();
- }
+ return withReadLockAndEnsureInitialized(
+ () ->
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition,
leaderEpoch));
}
@Override
@@ -277,17 +210,14 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
Set<TopicIdPartition>
followerPartitions) {
Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be
null");
Objects.requireNonNull(followerPartitions, "followerPartitions can not
be null");
-
log.info("Received leadership notifications with leader partitions {}
and follower partitions {}",
leaderPartitions, followerPartitions);
-
lock.readLock().lock();
try {
if (closing.get()) {
throw new IllegalStateException("This instance is in closing
state");
}
-
- HashSet<TopicIdPartition> allPartitions = new
HashSet<>(leaderPartitions);
+ Set<TopicIdPartition> allPartitions = new
HashSet<>(leaderPartitions);
allPartitions.addAll(followerPartitions);
if (!initialized.get()) {
// If it is not yet initialized, then keep them as pending
partitions and assign them
@@ -305,7 +235,6 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
for (TopicIdPartition partition : allPartitions) {
remotePartitionMetadataStore.maybeLoadPartition(partition);
}
-
consumerManager.addAssignmentsForPartitions(allPartitions);
}
@@ -316,7 +245,6 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
if (closing.get()) {
throw new IllegalStateException("This instance is in closing
state");
}
-
if (!initialized.get()) {
// If it is not yet initialized, then remove them from the
pending partitions if any.
if (!pendingAssignPartitions.isEmpty()) {
@@ -339,7 +267,8 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
// we reached a consensus that sequential iteration over files on the
local file system is performant enough.
// Should this stop being the case, the remote log size could be
calculated by incrementing/decrementing
// counters during API calls for a more performant implementation.
- Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator =
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition,
leaderEpoch);
+ Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator =
+
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition,
leaderEpoch);
while (remoteLogSegmentMetadataIterator.hasNext()) {
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
remoteLogSegmentMetadataIterator.next();
remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
@@ -348,40 +277,30 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
}
@Override
- public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long
offset) throws RemoteStorageException {
- lock.readLock().lock();
- try {
- ensureInitializedAndNotClosed();
- return
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch,
offset);
- } finally {
- lock.readLock().unlock();
- }
+ public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+ int
epoch,
+ long
offset) throws RemoteStorageException {
+ return withReadLockAndEnsureInitialized(
+ () ->
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch,
offset));
}
@Override
public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null.");
-
lock.writeLock().lock();
try {
- if (configured) {
+ if (configured.compareAndSet(false, true)) {
+ TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(configs);
+ // Scheduling the initialization producer/consumer managers in
a separate thread. Required resources may
+ // not yet be available now. This thread makes sure that it is
retried at regular intervals until it is
+ // successful.
+ initializationThread = KafkaThread.nonDaemon(
+ "RLMMInitializationThread", () ->
initializeResources(rlmmConfig));
+ initializationThread.start();
+ log.info("Successfully configured topic-based RLMM with
config: {}", rlmmConfig);
+ } else {
log.info("Skipping configure as it is already configured.");
- return;
}
-
- log.info("Started configuring topic-based RLMM with configs: {}",
configs);
-
- rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
- rlmTopicPartitioner =
remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount());
- remotePartitionMetadataStore =
remoteLogMetadataManagerSupplier.get();
- configured = true;
- log.info("Successfully configured topic-based RLMM with config:
{}", rlmmConfig);
-
- // Scheduling the initialization producer/consumer managers in a
separate thread. Required resources may
- // not yet be available now. This thread makes sure that it is
retried at regular intervals until it is
- // successful.
- initializationThread =
KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
- initializationThread.start();
} finally {
lock.writeLock().unlock();
}
@@ -392,93 +311,74 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
return remotePartitionMetadataStore.isInitialized(topicIdPartition);
}
- private void initializeResources() {
+ private void handleRetry(long retryIntervalMs) {
+ log.info("Sleep for {} ms before retrying.", retryIntervalMs);
+ Utils.sleep(retryIntervalMs);
+ }
+
+ private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig
rlmmConfig) {
log.info("Initializing topic-based RLMM resources");
- final NewTopic remoteLogMetadataTopicRequest =
createRemoteLogMetadataTopicRequest();
- boolean topicCreated = false;
+ int metadataTopicPartitionCount =
rlmmConfig.metadataTopicPartitionsCount();
+ long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs();
+ long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs();
+ RemoteLogMetadataTopicPartitioner partitioner =
partitionerFunction.apply(metadataTopicPartitionCount);
+ NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig);
+ boolean isTopicCreated = false;
long startTimeMs = time.milliseconds();
- Admin adminClient = null;
- try {
- adminClient = Admin.create(rlmmConfig.commonProperties());
- // Stop if it is already initialized or closing.
- while (!(initialized.get() || closing.get())) {
-
- // If it is timed out then raise an error to exit.
- if (time.milliseconds() - startTimeMs >
rlmmConfig.initializationRetryMaxTimeoutMs()) {
- log.error("Timed out in initializing the resources,
retried to initialize the resource for {} ms.",
- rlmmConfig.initializationRetryMaxTimeoutMs());
+ try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
+ while (!(initialized.get() || closing.get() ||
initializationFailed)) {
+ if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
+ log.error("Timed out to initialize the resources within {}
ms.", retryMaxTimeoutMs);
initializationFailed = true;
- return;
- }
-
- if (!topicCreated) {
- topicCreated = createTopic(adminClient,
remoteLogMetadataTopicRequest);
+ break;
}
-
- if (!topicCreated) {
- // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before
trying to create the topic again.
- log.info("Sleep for {} ms before it is retried again.",
rlmmConfig.initializationRetryIntervalMs());
- Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
+ isTopicCreated = isTopicCreated || createTopic(admin,
newTopic);
+ if (!isTopicCreated) {
+ handleRetry(retryIntervalMs);
continue;
- } else {
- // If topic is already created, validate the existing
topic partitions.
- try {
- String topicName =
remoteLogMetadataTopicRequest.name();
- // If the existing topic partition size is not same as
configured, mark initialization as failed and exit.
- if (!isPartitionsCountSameAsConfigured(adminClient,
topicName)) {
- initializationFailed = true;
- }
- } catch (Exception e) {
- log.info("Sleep for {} ms before it is retried
again.", rlmmConfig.initializationRetryIntervalMs());
-
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
- continue;
+ }
+ try {
+ if (!isPartitionsCountSameAsConfigured(admin,
newTopic.name(), metadataTopicPartitionCount)) {
+ initializationFailed = true;
+ break;
}
+ } catch (Exception e) {
+ handleRetry(retryIntervalMs);
+ continue;
}
-
// Create producer and consumer managers.
lock.writeLock().lock();
try {
- producerManager = new ProducerManager(rlmmConfig,
rlmTopicPartitioner);
- consumerManager = new ConsumerManager(rlmmConfig,
remotePartitionMetadataStore, rlmTopicPartitioner, time);
- if (startConsumerThread) {
- consumerManager.startConsumerThread();
- } else {
- log.info("RLMM Consumer task thread is not configured
to be started.");
- }
-
+ producerManager = new ProducerManager(rlmmConfig,
partitioner);
+ consumerManager = new ConsumerManager(rlmmConfig,
remotePartitionMetadataStore, partitioner, time);
+ consumerManager.startConsumerThread();
if (!pendingAssignPartitions.isEmpty()) {
assignPartitions(pendingAssignPartitions);
pendingAssignPartitions.clear();
}
-
initialized.set(true);
log.info("Initialized topic-based RLMM resources
successfully");
} catch (Exception e) {
log.error("Encountered error while initializing
producer/consumer", e);
initializationFailed = true;
- return;
} finally {
lock.writeLock().unlock();
}
}
- } catch (Exception e) {
+ } catch (KafkaException e) {
log.error("Encountered error while initializing topic-based RLMM
resources", e);
initializationFailed = true;
- } finally {
- Utils.closeQuietly(adminClient, "AdminClient");
}
}
- boolean doesTopicExist(Admin adminClient, String topic) throws
ExecutionException, InterruptedException {
+ boolean doesTopicExist(Admin admin, String topic) throws
ExecutionException, InterruptedException {
try {
- TopicDescription description =
adminClient.describeTopics(Set.of(topic))
+ TopicDescription description = admin.describeTopics(Set.of(topic))
.topicNameValues()
.get(topic)
.get();
- if (description != null) {
- log.info("Topic {} exists. TopicId: {}, numPartitions: {}, ",
topic,
- description.topicId(),
description.partitions().size());
- }
+ log.info("Topic {} exists. TopicId: {}, numPartitions: {}", topic,
description.topicId(),
+ description.partitions().size());
return true;
} catch (ExecutionException | InterruptedException ex) {
if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
@@ -489,24 +389,25 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
}
}
- private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
- String topicName) throws
InterruptedException, ExecutionException {
+ private boolean isPartitionsCountSameAsConfigured(Admin admin,
+ String topicName,
+ int
metadataTopicPartitionCount) throws InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and
replication factor.");
- TopicDescription topicDescription =
adminClient.describeTopics(Set.of(topicName))
-
.topicNameValues().get(topicName).get();
- int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
+ TopicDescription topicDescription = admin
+ .describeTopics(Set.of(topicName))
+ .topicNameValues()
+ .get(topicName)
+ .get();
int topicPartitionsSize = topicDescription.partitions().size();
-
- if (topicPartitionsSize != expectedPartitions) {
- log.error("Existing topic partition count [{}] is not same as the
expected partition count [{}]",
- topicPartitionsSize, expectedPartitions);
+ if (topicPartitionsSize != metadataTopicPartitionCount) {
+ log.error("Existing topic partition count {} is not same as the
expected partition count {}",
+ topicPartitionsSize, metadataTopicPartitionCount);
return false;
}
-
return true;
}
- private NewTopic createRemoteLogMetadataTopicRequest() {
+ private NewTopic
newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE);
@@ -520,13 +421,13 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
* @param newTopic topic to be created.
* @return Returns true if the topic already exists, or it is created
successfully.
*/
- private boolean createTopic(Admin adminClient, NewTopic newTopic) {
+ private boolean createTopic(Admin admin, NewTopic newTopic) {
boolean doesTopicExist = false;
String topic = newTopic.name();
try {
- doesTopicExist = doesTopicExist(adminClient, topic);
+ doesTopicExist = doesTopicExist(admin, topic);
if (!doesTopicExist) {
- CreateTopicsResult result =
adminClient.createTopics(Set.of(newTopic));
+ CreateTopicsResult result =
admin.createTopics(Set.of(newTopic));
result.all().get();
List<String> overriddenConfigs = result.config(topic).get()
.entries()
@@ -543,7 +444,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
// This exception can still occur as multiple brokers may call
create topics and one of them may become
// successful and other would throw TopicExistsException
if (e.getCause() instanceof TopicExistsException) {
- log.info("Topic [{}] already exists", topic);
+ log.info("Topic: {} already exists", topic);
doesTopicExist = true;
} else {
log.error("Encountered error while querying or creating {}
topic.", topic, e);
@@ -583,11 +484,31 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
log.error("Initialization thread was interrupted while
waiting to join on close.", e);
}
}
-
Utils.closeQuietly(producerManager, "ProducerTask");
Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
Utils.closeQuietly(remotePartitionMetadataStore,
"RemotePartitionMetadataStore");
log.info("Closed topic-based RLMM resources");
}
}
+
+ private <T> T withReadLockAndEnsureInitialized(ThrowingSupplier<T,
RemoteStorageException> action) throws RemoteStorageException {
+ lock.readLock().lock();
+ try {
+ ensureInitializedAndNotClosed();
+ return action.get();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @FunctionalInterface
+ public interface ThrowingSupplier<T, E extends Exception> {
+ /**
+ * Supplies a result, potentially throwing an exception.
+ *
+ * @return the supplied result.
+ * @throws E an exception that may be thrown during execution.
+ */
+ T get() throws E;
+ }
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 9c79fad397d..149ebb177d9 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -44,7 +44,6 @@ public class RemoteLogMetadataManagerTestUtils {
public static class Builder {
private String bootstrapServers;
- private boolean startConsumerThread;
private Map<String, Object> overrideRemoteLogMetadataManagerProps =
Map.of();
private Supplier<RemotePartitionMetadataStore>
remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
private Function<Integer, RemoteLogMetadataTopicPartitioner>
remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;
@@ -57,11 +56,6 @@ public class RemoteLogMetadataManagerTestUtils {
return this;
}
- public Builder startConsumerThread(boolean startConsumerThread) {
- this.startConsumerThread = startConsumerThread;
- return this;
- }
-
public Builder
remotePartitionMetadataStore(Supplier<RemotePartitionMetadataStore>
remotePartitionMetadataStore) {
this.remotePartitionMetadataStore = remotePartitionMetadataStore;
return this;
@@ -81,8 +75,7 @@ public class RemoteLogMetadataManagerTestUtils {
Objects.requireNonNull(bootstrapServers);
String logDir =
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
TopicBasedRemoteLogMetadataManager
topicBasedRemoteLogMetadataManager =
- new TopicBasedRemoteLogMetadataManager(startConsumerThread,
- remoteLogMetadataTopicPartitioner,
remotePartitionMetadataStore);
+ new
TopicBasedRemoteLogMetadataManager(remoteLogMetadataTopicPartitioner,
remotePartitionMetadataStore);
// Initialize TopicBasedRemoteLogMetadataManager.
Map<String, Object> configs = new HashMap<>();
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 1e91625e2bd..6dfd948dd09 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -69,7 +69,6 @@ public class RemoteLogSegmentLifecycleTest {
private RemoteLogMetadataManager
createTopicBasedRemoteLogMetadataManager() {
return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
- .startConsumerThread(true)
.remotePartitionMetadataStore(() ->
spyRemotePartitionMetadataStore)
.build();
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index c867b68635f..62befa0f6e3 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -98,7 +98,6 @@ public class
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager =
RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
- .startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions
-> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override
public int metadataPartition(TopicIdPartition
topicIdPartition) {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 908c9ef014c..a407882e42a 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -48,7 +48,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
private TopicBasedRemoteLogMetadataManager
createTopicBasedRemoteLogMetadataManager() {
return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
- .startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
.overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir))
.build();
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 4e40b7c7018..9fe8572cb4f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -72,7 +72,6 @@ public class TopicBasedRemoteLogMetadataManagerTest {
if (remoteLogMetadataManager == null)
remoteLogMetadataManager =
RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
- .startConsumerThread(true)
.remotePartitionMetadataStore(() ->
spyRemotePartitionMetadataEventHandler)
.build();
return remoteLogMetadataManager;
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index ce471a24999..8875d8d6a93 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -57,7 +57,6 @@ public class RemoteLogMetadataManagerTest {
private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
- .startConsumerThread(true)
.remotePartitionMetadataStore(RemotePartitionMetadataStore::new)
.build();
}