abhijeetk88 commented on code in PR #16502:
URL: https://github.com/apache/kafka/pull/16502#discussion_r1673509846
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log,
long startOffset, long en
}
// VisibleForTesting
- RLMTask rlmTask(TopicIdPartition topicIdPartition) {
- RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+ RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+ RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
if (task != null) {
return task.rlmTask;
}
return null;
}
- class RLMTask extends CancellableRunnable {
+ abstract class RLMTask extends CancellableRunnable {
- private final TopicIdPartition topicIdPartition;
- private final int customMetadataSizeLimit;
+ protected final TopicIdPartition topicIdPartition;
private final Logger logger;
- private volatile int leaderEpoch = -1;
-
- public RLMTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit) {
+ public RLMTask(TopicIdPartition topicIdPartition) {
this.topicIdPartition = topicIdPartition;
- this.customMetadataSizeLimit = customMetadataSizeLimit;
- LogContext logContext = new LogContext("[RemoteLogManager=" +
brokerId + " partition=" + topicIdPartition + "] ");
- logger = logContext.logger(RLMTask.class);
+ this.logger =
getLogContext(topicIdPartition).logger(RLMTask.class);
}
- boolean isLeader() {
- return leaderEpoch >= 0;
+ protected LogContext getLogContext(TopicIdPartition topicIdPartition) {
+ return new LogContext("[RemoteLogManager=" + brokerId + "
partition=" + topicIdPartition + "] ");
}
- // The copied and log-start offset is empty initially for a new leader
RLMTask, and needs to be fetched inside
+ public void run() {
+ if (isCancelled())
+ return;
+
+ try {
+ Optional<UnifiedLog> unifiedLogOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+
+ if (!unifiedLogOptional.isPresent()) {
+ return;
+ }
+
+ execute(unifiedLogOptional.get());
+ } catch (InterruptedException ex) {
+ if (!isCancelled()) {
+ logger.warn("Current thread for topic-partition-id {} is
interrupted", topicIdPartition, ex);
+ }
+ } catch (RetriableException ex) {
+ logger.debug("Encountered a retryable error while executing
current task for topic-partition {}", topicIdPartition, ex);
+ } catch (Exception ex) {
+ if (!isCancelled()) {
+ logger.warn("Current task for topic-partition {} received
error but it will be scheduled", topicIdPartition, ex);
+ }
+ }
+ }
+
+ protected abstract void execute(UnifiedLog log) throws
InterruptedException, RemoteStorageException, ExecutionException;
+
+ public String toString() {
+ return this.getClass() + "[" + topicIdPartition + "]";
+ }
+ }
+
+ class RLMCopyTask extends RLMTask {
+ private final int customMetadataSizeLimit;
+ private final int leaderEpoch;
+ private final Logger logger;
+
+ // The copied and log-start offset is empty initially for a new
RLMCopyTask, and needs to be fetched inside
// the task's run() method.
private volatile Optional<OffsetAndEpoch> copiedOffsetOption =
Optional.empty();
- private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader =
false;
+ private volatile boolean isLogStartOffsetUpdated = false;
private volatile Optional<String> logDirectory = Optional.empty();
- public void convertToLeader(int leaderEpochVal) {
+ public RLMCopyTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit, int leaderEpochVal) {
+ super(topicIdPartition);
+ this.customMetadataSizeLimit = customMetadataSizeLimit;
if (leaderEpochVal < 0) {
throw new KafkaException("leaderEpoch value for topic
partition " + topicIdPartition + " can not be negative");
}
- if (this.leaderEpoch != leaderEpochVal) {
- leaderEpoch = leaderEpochVal;
- }
- // Reset copied and log-start offset, so that it is set in next
run of RLMTask
- copiedOffsetOption = Optional.empty();
- isLogStartOffsetUpdatedOnBecomingLeader = false;
+ this.leaderEpoch = leaderEpochVal;
+ this.logger =
getLogContext(topicIdPartition).logger(RLMCopyTask.class);
}
- public void convertToFollower() {
- leaderEpoch = -1;
+ @Override
+ protected void execute(UnifiedLog log) throws InterruptedException {
+ // In the first run after completing altering logDir within
broker, we should make sure the state is reset. (KAFKA-16711)
+ if (!log.parentDir().equals(logDirectory.orElse(null))) {
+ copiedOffsetOption = Optional.empty();
+ isLogStartOffsetUpdated = false;
+ logDirectory = Optional.of(log.parentDir());
+ }
+
Review Comment:
The three variables dont seem to be used in the follower. These variables
are not part of RLMFollowerTask as well. If these were used, the code wouldn't
compile.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]