This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16c079ed23e KAFKA-19525: Refactor TopicBasedRLMM implementation to 
remove unused code (#20204)
16c079ed23e is described below

commit 16c079ed23e40ff66660515a0fb4a832bde5eaf8
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Jul 23 12:19:13 2025 +0530

    KAFKA-19525: Refactor TopicBasedRLMM implementation to remove unused code 
(#20204)
    
    - startConsumerThread is always true so removed the variable.
    - Replaced the repetitive lock handling logic with
    `withReadLockAndEnsureInitialized` to reduce duplication and improve
    readability.
    - Consolidated the logic in `initializeResources` and. simplified method
    arguments to better encapsulate configuration.
    - Extracted common code and reduced the usage of global variables.
    - Named the variables properly.
    
    Tests:
    - Existing UTs since this patch refactored the code.
    
    Reviewers: PoAn Yang <[email protected]>
---
 .../TopicBasedRemoteLogMetadataManager.java        | 361 ++++++++-------------
 .../storage/RemoteLogMetadataManagerTestUtils.java |   9 +-
 .../storage/RemoteLogSegmentLifecycleTest.java     |   1 -
 ...ogMetadataManagerMultipleSubscriptionsTest.java |   1 -
 ...icBasedRemoteLogMetadataManagerRestartTest.java |   1 -
 .../TopicBasedRemoteLogMetadataManagerTest.java    |   1 -
 .../storage/RemoteLogMetadataManagerTest.java      |   1 -
 7 files changed, 142 insertions(+), 233 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 927cc47efde..91011b1d9c2 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -56,28 +56,25 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
- * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name
+ * {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
  * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
  * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class, and it subscribes
  * to metadata updates for the registered user topic partitions.
  */
 public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
     private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+    private final Time time = Time.SYSTEM;
 
-    private volatile boolean configured = false;
-
-    // It indicates whether the close process of this instance is started or 
not via #close() method.
-    // Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
-    // if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+    private final AtomicBoolean configured = new AtomicBoolean(false);
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
-    private final Time time = Time.SYSTEM;
-    private final boolean startConsumerThread;
 
     private Thread initializationThread;
     private volatile ProducerManager producerManager;
@@ -85,112 +82,77 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 
     // This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
     // requests calling different methods which use the resources like 
producer/consumer managers.
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private RemotePartitionMetadataStore remotePartitionMetadataStore;
-    private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
-    private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final RemotePartitionMetadataStore remotePartitionMetadataStore;
     private final Set<TopicIdPartition> pendingAssignPartitions = 
Collections.synchronizedSet(new HashSet<>());
-    private volatile boolean initializationFailed;
-    private final Supplier<RemotePartitionMetadataStore> 
remoteLogMetadataManagerSupplier;
-    private final Function<Integer, RemoteLogMetadataTopicPartitioner> 
remoteLogMetadataTopicPartitionerFunction;
+    private volatile boolean initializationFailed = false;
+    private final Function<Integer, RemoteLogMetadataTopicPartitioner> 
partitionerFunction;
 
-    /**
-     * The default constructor delegates to the internal one, starting the 
consumer thread and
-     * supplying an instance of RemoteLogMetadataTopicPartitioner and 
RemotePartitionMetadataStore by default.
-     */
     public TopicBasedRemoteLogMetadataManager() {
-        this(true, RemoteLogMetadataTopicPartitioner::new, 
RemotePartitionMetadataStore::new);
+        this(RemoteLogMetadataTopicPartitioner::new, 
RemotePartitionMetadataStore::new);
     }
 
-    /**
-     * Used in tests to dynamically configure the instance.
-     */
-    TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, 
Function<Integer, RemoteLogMetadataTopicPartitioner> 
remoteLogMetadataTopicPartitionerFunction, 
Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
-        this.startConsumerThread = startConsumerThread;
-        this.remoteLogMetadataManagerSupplier = 
remoteLogMetadataManagerSupplier;
-        this.remoteLogMetadataTopicPartitionerFunction = 
remoteLogMetadataTopicPartitionerFunction;
+    TopicBasedRemoteLogMetadataManager(Function<Integer, 
RemoteLogMetadataTopicPartitioner> partitionerFunction,
+                                       Supplier<RemotePartitionMetadataStore> 
metadataStoreSupplier) {
+        this.partitionerFunction = partitionerFunction;
+        this.remotePartitionMetadataStore = metadataStoreSupplier.get();
     }
 
+    /**
+     * Adds metadata for a remote log segment to the metadata store.
+     * The provided metadata must have the state {@code COPY_SEGMENT_STARTED}.
+     *
+     * @param remoteLogSegmentMetadata the metadata of the remote log segment 
to be added; must not be null
+     * @return a {@link CompletableFuture} that completes once the metadata 
has been published to the topic
+     * @throws RemoteStorageException   if an error occurs while storing the 
metadata
+     * @throws IllegalArgumentException if the state of the provided metadata 
is not {@code COPY_SEGMENT_STARTED}
+     */
     @Override
     public CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
             throws RemoteStorageException {
         Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
-
-        // This allows gracefully rejecting the requests while closing of this 
instance is in progress, which triggers
-        // closing the producer/consumer manager instances.
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            // This method is allowed only to add remote log segment with the 
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
-            // but not to update the existing remote log segment metadata.
+        return withReadLockAndEnsureInitialized(() -> {
             if (remoteLogSegmentMetadata.state() != 
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                 throw new IllegalArgumentException(
                         "Given remoteLogSegmentMetadata should have state as " 
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED
                                 + " but it contains state as: " + 
remoteLogSegmentMetadata.state());
             }
-
-            // Publish the message to the topic.
-            return 
storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
-                                          remoteLogSegmentMetadata);
-        } finally {
-            lock.readLock().unlock();
-        }
+            return storeRemoteLogMetadata(remoteLogSegmentMetadata);
+        });
     }
 
     @Override
-    public CompletableFuture<Void> 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
segmentMetadataUpdate)
+    public CompletableFuture<Void> 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
             throws RemoteStorageException {
-        Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate 
can not be null");
-
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            // Callers should use addRemoteLogSegmentMetadata to add 
RemoteLogSegmentMetadata with state as
-            // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
-            if (segmentMetadataUpdate.state() == 
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
-                throw new IllegalArgumentException("Given 
remoteLogSegmentMetadata should not have the state as: "
-                                                           + 
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+        Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be 
null");
+        return withReadLockAndEnsureInitialized(() -> {
+            if (metadataUpdate.state() == 
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+                throw new IllegalArgumentException("Given 
remoteLogSegmentMetadataUpdate should not have the state as: "
+                        + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
             }
-
-            // Publish the message to the topic.
-            return 
storeRemoteLogMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
-        } finally {
-            lock.readLock().unlock();
-        }
+            return storeRemoteLogMetadata(metadataUpdate);
+        });
     }
 
     @Override
-    public CompletableFuture<Void> 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+    public CompletableFuture<Void> 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata deleteMetadata)
             throws RemoteStorageException {
-        Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
-
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            return 
storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
-        } finally {
-            lock.readLock().unlock();
-        }
+        Objects.requireNonNull(deleteMetadata, "deleteMetadata can not be 
null");
+        return withReadLockAndEnsureInitialized(
+                () -> storeRemoteLogMetadata(deleteMetadata));
     }
 
     /**
      * Returns {@link CompletableFuture} which will complete only after 
publishing of the given {@code remoteLogMetadata} into
      * the remote log metadata topic and the internal consumer is caught up 
until the produced record's offset.
      *
-     * @param topicIdPartition partition of the given remoteLogMetadata.
-     * @param remoteLogMetadata RemoteLogMetadata to be stored.
+     *  @param remoteLogMetadata RemoteLogMetadata to be stored.
      * @return a future with acknowledge and potentially waiting also for 
consumer to catch up.
      * This ensures cache is synchronized with backing topic.
      * @throws RemoteStorageException if there are any storage errors occur.
      */
-    private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition 
topicIdPartition,
-                                                           RemoteLogMetadata 
remoteLogMetadata)
-            throws RemoteStorageException {
-        log.debug("Storing the partition: {} metadata: {}", topicIdPartition, 
remoteLogMetadata);
+    private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata 
remoteLogMetadata) throws RemoteStorageException {
+        log.debug("Storing the partition: {} metadata: {}", 
remoteLogMetadata.topicIdPartition(), remoteLogMetadata);
         try {
             // Publish the message to the metadata topic.
             CompletableFuture<RecordMetadata> produceFuture = 
producerManager.publishMessage(remoteLogMetadata);
@@ -214,62 +176,33 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
     @Override
     public Optional<RemoteLogSegmentMetadata> 
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                        int 
epochForOffset,
-                                                                       long 
offset)
-            throws RemoteStorageException {
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            return 
remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, 
epochForOffset);
-        } finally {
-            lock.readLock().unlock();
-        }
+                                                                       long 
offset) throws RemoteStorageException {
+        return withReadLockAndEnsureInitialized(
+                () -> 
remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, 
epochForOffset));
     }
 
     @Override
     public Optional<Long> highestOffsetForEpoch(TopicIdPartition 
topicIdPartition,
                                                 int leaderEpoch)
             throws RemoteStorageException {
-        lock.readLock().lock();
-        try {
-
-            ensureInitializedAndNotClosed();
-
-            return 
remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch);
-        } finally {
-            lock.readLock().unlock();
-        }
-
+        return withReadLockAndEnsureInitialized(
+                () -> 
remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch));
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition)
             throws RemoteStorageException {
         Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            return 
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
-        } finally {
-            lock.readLock().unlock();
-        }
+        return withReadLockAndEnsureInitialized(
+                () -> 
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition));
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
             throws RemoteStorageException {
         Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-
-            return 
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
-        } finally {
-            lock.readLock().unlock();
-        }
+        return withReadLockAndEnsureInitialized(
+                () -> 
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, 
leaderEpoch));
     }
 
     @Override
@@ -277,17 +210,14 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
                                              Set<TopicIdPartition> 
followerPartitions) {
         Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be 
null");
         Objects.requireNonNull(followerPartitions, "followerPartitions can not 
be null");
-
         log.info("Received leadership notifications with leader partitions {} 
and follower partitions {}",
                  leaderPartitions, followerPartitions);
-
         lock.readLock().lock();
         try {
             if (closing.get()) {
                 throw new IllegalStateException("This instance is in closing 
state");
             }
-
-            HashSet<TopicIdPartition> allPartitions = new 
HashSet<>(leaderPartitions);
+            Set<TopicIdPartition> allPartitions = new 
HashSet<>(leaderPartitions);
             allPartitions.addAll(followerPartitions);
             if (!initialized.get()) {
                 // If it is not yet initialized, then keep them as pending 
partitions and assign them
@@ -305,7 +235,6 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         for (TopicIdPartition partition : allPartitions) {
             remotePartitionMetadataStore.maybeLoadPartition(partition);
         }
-
         consumerManager.addAssignmentsForPartitions(allPartitions);
     }
 
@@ -316,7 +245,6 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
             if (closing.get()) {
                 throw new IllegalStateException("This instance is in closing 
state");
             }
-
             if (!initialized.get()) {
                 // If it is not yet initialized, then remove them from the 
pending partitions if any.
                 if (!pendingAssignPartitions.isEmpty()) {
@@ -339,7 +267,8 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         // we reached a consensus that sequential iteration over files on the 
local file system is performant enough.
         // Should this stop being the case, the remote log size could be 
calculated by incrementing/decrementing
         // counters during API calls for a more performant implementation.
-        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = 
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
+        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator =
+                
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
         while (remoteLogSegmentMetadataIterator.hasNext()) {
             RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
remoteLogSegmentMetadataIterator.next();
             remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
@@ -348,40 +277,30 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
     }
 
     @Override
-    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long 
offset) throws RemoteStorageException {
-        lock.readLock().lock();
-        try {
-            ensureInitializedAndNotClosed();
-            return 
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, 
offset);
-        } finally {
-            lock.readLock().unlock();
-        }
+    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+                                                                      int 
epoch,
+                                                                      long 
offset) throws RemoteStorageException {
+        return withReadLockAndEnsureInitialized(
+                () -> 
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, 
offset));
     }
 
     @Override
     public void configure(Map<String, ?> configs) {
         Objects.requireNonNull(configs, "configs can not be null.");
-
         lock.writeLock().lock();
         try {
-            if (configured) {
+            if (configured.compareAndSet(false, true)) {
+                TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(configs);
+                // Scheduling the initialization producer/consumer managers in 
a separate thread. Required resources may
+                // not yet be available now. This thread makes sure that it is 
retried at regular intervals until it is
+                // successful.
+                initializationThread = KafkaThread.nonDaemon(
+                        "RLMMInitializationThread", () -> 
initializeResources(rlmmConfig));
+                initializationThread.start();
+                log.info("Successfully configured topic-based RLMM with 
config: {}", rlmmConfig);
+            } else {
                 log.info("Skipping configure as it is already configured.");
-                return;
             }
-
-            log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
-
-            rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-            rlmTopicPartitioner = 
remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount());
-            remotePartitionMetadataStore = 
remoteLogMetadataManagerSupplier.get();
-            configured = true;
-            log.info("Successfully configured topic-based RLMM with config: 
{}", rlmmConfig);
-
-            // Scheduling the initialization producer/consumer managers in a 
separate thread. Required resources may
-            // not yet be available now. This thread makes sure that it is 
retried at regular intervals until it is
-            // successful.
-            initializationThread = 
KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
-            initializationThread.start();
         } finally {
             lock.writeLock().unlock();
         }
@@ -392,93 +311,74 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
         return remotePartitionMetadataStore.isInitialized(topicIdPartition);
     }
 
-    private void initializeResources() {
+    private void handleRetry(long retryIntervalMs) {
+        log.info("Sleep for {} ms before retrying.", retryIntervalMs);
+        Utils.sleep(retryIntervalMs);
+    }
+
+    private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig 
rlmmConfig) {
         log.info("Initializing topic-based RLMM resources");
-        final NewTopic remoteLogMetadataTopicRequest = 
createRemoteLogMetadataTopicRequest();
-        boolean topicCreated = false;
+        int metadataTopicPartitionCount = 
rlmmConfig.metadataTopicPartitionsCount();
+        long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs();
+        long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs();
+        RemoteLogMetadataTopicPartitioner partitioner = 
partitionerFunction.apply(metadataTopicPartitionCount);
+        NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig);
+        boolean isTopicCreated = false;
         long startTimeMs = time.milliseconds();
-        Admin adminClient = null;
-        try {
-            adminClient = Admin.create(rlmmConfig.commonProperties());
-            // Stop if it is already initialized or closing.
-            while (!(initialized.get() || closing.get())) {
-
-                // If it is timed out then raise an error to exit.
-                if (time.milliseconds() - startTimeMs > 
rlmmConfig.initializationRetryMaxTimeoutMs()) {
-                    log.error("Timed out in initializing the resources, 
retried to initialize the resource for {} ms.",
-                            rlmmConfig.initializationRetryMaxTimeoutMs());
+        try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
+            while (!(initialized.get() || closing.get() || 
initializationFailed)) {
+                if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
+                    log.error("Timed out to initialize the resources within {} 
ms.", retryMaxTimeoutMs);
                     initializationFailed = true;
-                    return;
-                }
-
-                if (!topicCreated) {
-                    topicCreated = createTopic(adminClient, 
remoteLogMetadataTopicRequest);
+                    break;
                 }
-
-                if (!topicCreated) {
-                    // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before 
trying to create the topic again.
-                    log.info("Sleep for {} ms before it is retried again.", 
rlmmConfig.initializationRetryIntervalMs());
-                    Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
+                isTopicCreated = isTopicCreated || createTopic(admin, 
newTopic);
+                if (!isTopicCreated) {
+                    handleRetry(retryIntervalMs);
                     continue;
-                } else {
-                    // If topic is already created, validate the existing 
topic partitions.
-                    try {
-                        String topicName = 
remoteLogMetadataTopicRequest.name();
-                        // If the existing topic partition size is not same as 
configured, mark initialization as failed and exit.
-                        if (!isPartitionsCountSameAsConfigured(adminClient, 
topicName)) {
-                            initializationFailed = true;
-                        }
-                    } catch (Exception e) {
-                        log.info("Sleep for {} ms before it is retried 
again.", rlmmConfig.initializationRetryIntervalMs());
-                        
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
-                        continue;
+                }
+                try {
+                    if (!isPartitionsCountSameAsConfigured(admin, 
newTopic.name(), metadataTopicPartitionCount)) {
+                        initializationFailed = true;
+                        break;
                     }
+                } catch (Exception e) {
+                    handleRetry(retryIntervalMs);
+                    continue;
                 }
-
                 // Create producer and consumer managers.
                 lock.writeLock().lock();
                 try {
-                    producerManager = new ProducerManager(rlmmConfig, 
rlmTopicPartitioner);
-                    consumerManager = new ConsumerManager(rlmmConfig, 
remotePartitionMetadataStore, rlmTopicPartitioner, time);
-                    if (startConsumerThread) {
-                        consumerManager.startConsumerThread();
-                    } else {
-                        log.info("RLMM Consumer task thread is not configured 
to be started.");
-                    }
-
+                    producerManager = new ProducerManager(rlmmConfig, 
partitioner);
+                    consumerManager = new ConsumerManager(rlmmConfig, 
remotePartitionMetadataStore, partitioner, time);
+                    consumerManager.startConsumerThread();
                     if (!pendingAssignPartitions.isEmpty()) {
                         assignPartitions(pendingAssignPartitions);
                         pendingAssignPartitions.clear();
                     }
-
                     initialized.set(true);
                     log.info("Initialized topic-based RLMM resources 
successfully");
                 } catch (Exception e) {
                     log.error("Encountered error while initializing 
producer/consumer", e);
                     initializationFailed = true;
-                    return;
                 } finally {
                     lock.writeLock().unlock();
                 }
             }
-        } catch (Exception e) {
+        } catch (KafkaException e) {
             log.error("Encountered error while initializing topic-based RLMM 
resources", e);
             initializationFailed = true;
-        } finally {
-            Utils.closeQuietly(adminClient, "AdminClient");
         }
     }
 
-    boolean doesTopicExist(Admin adminClient, String topic) throws 
ExecutionException, InterruptedException {
+    boolean doesTopicExist(Admin admin, String topic) throws 
ExecutionException, InterruptedException {
         try {
-            TopicDescription description = 
adminClient.describeTopics(Set.of(topic))
+            TopicDescription description = admin.describeTopics(Set.of(topic))
                     .topicNameValues()
                     .get(topic)
                     .get();
-            if (description != null) {
-                log.info("Topic {} exists. TopicId: {}, numPartitions: {}, ", 
topic,
-                        description.topicId(), 
description.partitions().size());
-            }
+            log.info("Topic {} exists. TopicId: {}, numPartitions: {}", topic, 
description.topicId(),
+                    description.partitions().size());
             return true;
         } catch (ExecutionException | InterruptedException ex) {
             if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
@@ -489,24 +389,25 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
         }
     }
 
-    private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
-                                                      String topicName) throws 
InterruptedException, ExecutionException {
+    private boolean isPartitionsCountSameAsConfigured(Admin admin,
+                                                      String topicName,
+                                                      int 
metadataTopicPartitionCount) throws InterruptedException, ExecutionException {
         log.debug("Getting topic details to check for partition count and 
replication factor.");
-        TopicDescription topicDescription = 
adminClient.describeTopics(Set.of(topicName))
-                                                       
.topicNameValues().get(topicName).get();
-        int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
+        TopicDescription topicDescription = admin
+                .describeTopics(Set.of(topicName))
+                .topicNameValues()
+                .get(topicName)
+                .get();
         int topicPartitionsSize = topicDescription.partitions().size();
-
-        if (topicPartitionsSize != expectedPartitions) {
-            log.error("Existing topic partition count [{}] is not same as the 
expected partition count [{}]",
-                      topicPartitionsSize, expectedPartitions);
+        if (topicPartitionsSize != metadataTopicPartitionCount) {
+            log.error("Existing topic partition count {} is not same as the 
expected partition count {}",
+                      topicPartitionsSize, metadataTopicPartitionCount);
             return false;
         }
-
         return true;
     }
 
-    private NewTopic createRemoteLogMetadataTopicRequest() {
+    private NewTopic 
newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
         Map<String, String> topicConfigs = new HashMap<>();
         topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, 
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
         topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
@@ -520,13 +421,13 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
      * @param newTopic topic to be created.
      * @return Returns true if the topic already exists, or it is created 
successfully.
      */
-    private boolean createTopic(Admin adminClient, NewTopic newTopic) {
+    private boolean createTopic(Admin admin, NewTopic newTopic) {
         boolean doesTopicExist = false;
         String topic = newTopic.name();
         try {
-            doesTopicExist = doesTopicExist(adminClient, topic);
+            doesTopicExist = doesTopicExist(admin, topic);
             if (!doesTopicExist) {
-                CreateTopicsResult result = 
adminClient.createTopics(Set.of(newTopic));
+                CreateTopicsResult result = 
admin.createTopics(Set.of(newTopic));
                 result.all().get();
                 List<String> overriddenConfigs = result.config(topic).get()
                         .entries()
@@ -543,7 +444,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
             // This exception can still occur as multiple brokers may call 
create topics and one of them may become
             // successful and other would throw TopicExistsException
             if (e.getCause() instanceof TopicExistsException) {
-                log.info("Topic [{}] already exists", topic);
+                log.info("Topic: {} already exists", topic);
                 doesTopicExist = true;
             } else {
                 log.error("Encountered error while querying or creating {} 
topic.", topic, e);
@@ -583,11 +484,31 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
                     log.error("Initialization thread was interrupted while 
waiting to join on close.", e);
                 }
             }
-
             Utils.closeQuietly(producerManager, "ProducerTask");
             Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
             Utils.closeQuietly(remotePartitionMetadataStore, 
"RemotePartitionMetadataStore");
             log.info("Closed topic-based RLMM resources");
         }
     }
+
+    private <T> T withReadLockAndEnsureInitialized(ThrowingSupplier<T, 
RemoteStorageException> action) throws RemoteStorageException {
+        lock.readLock().lock();
+        try {
+            ensureInitializedAndNotClosed();
+            return action.get();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @FunctionalInterface
+    public interface ThrowingSupplier<T, E extends Exception> {
+        /**
+         * Supplies a result, potentially throwing an exception.
+         *
+         * @return the supplied result.
+         * @throws E an exception that may be thrown during execution.
+         */
+        T get() throws E;
+    }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 9c79fad397d..149ebb177d9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -44,7 +44,6 @@ public class RemoteLogMetadataManagerTestUtils {
 
     public static class Builder {
         private String bootstrapServers;
-        private boolean startConsumerThread;
         private Map<String, Object> overrideRemoteLogMetadataManagerProps = 
Map.of();
         private Supplier<RemotePartitionMetadataStore> 
remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
         private Function<Integer, RemoteLogMetadataTopicPartitioner> 
remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;
@@ -57,11 +56,6 @@ public class RemoteLogMetadataManagerTestUtils {
             return this;
         }
 
-        public Builder startConsumerThread(boolean startConsumerThread) {
-            this.startConsumerThread = startConsumerThread;
-            return this;
-        }
-
         public Builder 
remotePartitionMetadataStore(Supplier<RemotePartitionMetadataStore> 
remotePartitionMetadataStore) {
             this.remotePartitionMetadataStore = remotePartitionMetadataStore;
             return this;
@@ -81,8 +75,7 @@ public class RemoteLogMetadataManagerTestUtils {
             Objects.requireNonNull(bootstrapServers);
             String logDir = 
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
             TopicBasedRemoteLogMetadataManager 
topicBasedRemoteLogMetadataManager =
-                new TopicBasedRemoteLogMetadataManager(startConsumerThread,
-                    remoteLogMetadataTopicPartitioner, 
remotePartitionMetadataStore);
+                new 
TopicBasedRemoteLogMetadataManager(remoteLogMetadataTopicPartitioner, 
remotePartitionMetadataStore);
 
             // Initialize TopicBasedRemoteLogMetadataManager.
             Map<String, Object> configs = new HashMap<>();
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 1e91625e2bd..6dfd948dd09 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -69,7 +69,6 @@ public class RemoteLogSegmentLifecycleTest {
     private RemoteLogMetadataManager 
createTopicBasedRemoteLogMetadataManager() {
         return RemoteLogMetadataManagerTestUtils.builder()
                 .bootstrapServers(clusterInstance.bootstrapServers())
-                .startConsumerThread(true)
                 .remotePartitionMetadataStore(() -> 
spyRemotePartitionMetadataStore)
                 .build();
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index c867b68635f..62befa0f6e3 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -98,7 +98,6 @@ public class 
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
 
         try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = 
RemoteLogMetadataManagerTestUtils.builder()
                 .bootstrapServers(clusterInstance.bootstrapServers())
-                .startConsumerThread(true)
                 .remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions 
-> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
                     @Override
                     public int metadataPartition(TopicIdPartition 
topicIdPartition) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 908c9ef014c..a407882e42a 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -48,7 +48,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
     private TopicBasedRemoteLogMetadataManager 
createTopicBasedRemoteLogMetadataManager() {
         return RemoteLogMetadataManagerTestUtils.builder()
                 .bootstrapServers(clusterInstance.bootstrapServers())
-                .startConsumerThread(true)
                 
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
                 .overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir))
                 .build();
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 4e40b7c7018..9fe8572cb4f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -72,7 +72,6 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         if (remoteLogMetadataManager == null)
             remoteLogMetadataManager = 
RemoteLogMetadataManagerTestUtils.builder()
                 .bootstrapServers(clusterInstance.bootstrapServers())
-                .startConsumerThread(true)
                 .remotePartitionMetadataStore(() -> 
spyRemotePartitionMetadataEventHandler)
                 .build();
         return remoteLogMetadataManager;
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index ce471a24999..8875d8d6a93 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -57,7 +57,6 @@ public class RemoteLogMetadataManagerTest {
     private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
         return RemoteLogMetadataManagerTestUtils.builder()
                 .bootstrapServers(clusterInstance.bootstrapServers())
-                .startConsumerThread(true)
                 
.remotePartitionMetadataStore(RemotePartitionMetadataStore::new)
                 .build();
     }


Reply via email to