kamalcph commented on code in PR #16502:
URL: https://github.com/apache/kafka/pull/16502#discussion_r1673686505
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
indexCache = new
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
- rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+ rlmCopyThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+ rlmExpirationThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
Review Comment:
We can start using the respective threadpool-size:
```
rlmConfig.remoteLogManagerExpirationThreadPoolSize()
```
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
indexCache = new
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
- rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+ rlmCopyThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+ rlmExpirationThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+ followerThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ "RLMFollowerScheduledThreadPool",
"kafka-rlm-follower-thread-pool-");
Review Comment:
The amount of threads configured by the user and instantiated will be 2X,
Why do we need a separate follower thread pool?
We can have three different tasks for clarity: RLMCopyTask,
RLMExpirationTask and RLMFollowerTask. RLMCopyTask and RLMFollowerTask can
reuse the same thread-pool. WDYT?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log,
long startOffset, long en
}
// VisibleForTesting
- RLMTask rlmTask(TopicIdPartition topicIdPartition) {
- RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+ RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+ RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
if (task != null) {
return task.rlmTask;
}
return null;
}
- class RLMTask extends CancellableRunnable {
+ abstract class RLMTask extends CancellableRunnable {
- private final TopicIdPartition topicIdPartition;
- private final int customMetadataSizeLimit;
+ protected final TopicIdPartition topicIdPartition;
private final Logger logger;
- private volatile int leaderEpoch = -1;
-
- public RLMTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit) {
+ public RLMTask(TopicIdPartition topicIdPartition) {
this.topicIdPartition = topicIdPartition;
- this.customMetadataSizeLimit = customMetadataSizeLimit;
- LogContext logContext = new LogContext("[RemoteLogManager=" +
brokerId + " partition=" + topicIdPartition + "] ");
- logger = logContext.logger(RLMTask.class);
+ this.logger =
getLogContext(topicIdPartition).logger(RLMTask.class);
}
- boolean isLeader() {
- return leaderEpoch >= 0;
+ protected LogContext getLogContext(TopicIdPartition topicIdPartition) {
Review Comment:
We can avoid taking the `topicIdPartition` parameter since it is also the
class variable
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1761,19 +1796,48 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo
fetchInfo, Consumer<RemoteL
new RemoteLogReader(fetchInfo, this, callback,
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
}
- void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
- Consumer<RLMTask>
convertToLeaderOrFollower) {
- RLMTaskWithFuture rlmTaskWithFuture =
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
- topicIdPartition -> {
- RLMTask task = new RLMTask(topicIdPartition,
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
- // set this upfront when it is getting initialized instead
of doing it after scheduling.
- convertToLeaderOrFollower.accept(task);
- LOGGER.info("Created a new task: {} and getting
scheduled", task);
- ScheduledFuture<?> future =
rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
- return new RLMTaskWithFuture(task, future);
- }
- );
- convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
+ void doHandleLeaderPartition(TopicIdPartition topicPartition, int
leaderEpoch) {
+ RLMTaskWithFuture followerRLMTaskWithFuture =
followerRLMTasks.remove(topicPartition);
+ if (followerRLMTaskWithFuture != null) {
+ LOGGER.info("Cancelling the follower task: {}",
followerRLMTaskWithFuture.rlmTask);
+ followerRLMTaskWithFuture.cancel();
+ }
+
+ leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition ->
{
+ RLMCopyTask task = new RLMCopyTask(topicIdPartition,
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes(), leaderEpoch);
Review Comment:
Will there be a leaderEpoch bump and the same node can still be the leader?
If yes, then the leaderEpoch maintained inside the RLMCopyTask might become
stale. Currently, leaderEpoch is unused inside the RLMCopyTask, we can refactor
the code to remove it later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]