junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2049738720
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +592,308 @@ Lock lock() {
return lock;
}
+ // Visible for testing.
+ RemoteFetch remoteFetch() {
+ return remoteFetchOpt.orElse(null);
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
+
+ private Optional<TopicPartitionRemoteFetchInfo>
maybePrepareRemoteStorageFetchInfo(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ Optional<TopicPartitionRemoteFetchInfo>
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+ for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponse.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ LogReadResult logReadResult = entry.getValue();
+ if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+ // TODO: There is a limitation in remote storage fetch for
consumer groups that we can only perform remote fetch for
+ // a single topic partition in a fetch request. Since, the
logic of fetch is largely based on how consumer groups work,
+ // we are following the same logic. However, this problem
should be addressed as part of KAFKA-19133 which should help us perform
+ // fetch for multiple remote fetch topic partition in a
single share fetch request
+ topicPartitionRemoteFetchInfoOpt = Optional.of(new
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+ partitionsAcquired.put(topicIdPartition,
topicPartitionData.get(topicIdPartition));
+ break;
+ }
+ }
+ return topicPartitionRemoteFetchInfoOpt;
+ }
+
+ private boolean maybeProcessRemoteFetch(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+ ) throws Exception {
+ Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new
LinkedHashSet<>();
+ topicPartitionData.keySet().forEach(topicIdPartition -> {
+ // topic partitions for which fetch would not be happening in this
share fetch request.
+ if
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+ nonRemoteFetchTopicPartitions.add(topicIdPartition);
+ }
+ });
+ // Release fetch lock for the topic partitions that were acquired but
were not a part of remote fetch and add
+ // them to the delayed actions queue.
+
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+ Optional<Exception> exceptionOpt =
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+ if (exceptionOpt.isPresent()) {
+ throw exceptionOpt.get();
+ }
+ // Check if remote fetch can be completed.
+ return maybeCompletePendingRemoteFetch();
+ }
+
+ /**
+ * Returns an option containing an exception if a task for
RemoteStorageFetchInfo could not be scheduled successfully else returns empty
optional.
+ * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic
partition information.
+ */
+ private Optional<Exception> processRemoteFetchOrException(
+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+ ) {
+ TopicIdPartition remoteFetchTopicIdPartition =
topicPartitionRemoteFetchInfo.topicIdPartition();
+ RemoteStorageFetchInfo remoteStorageFetchInfo =
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+ Future<Void> remoteFetchTask;
+ CompletableFuture<RemoteLogReadResult> remoteFetchResult = new
CompletableFuture<>();
+ try {
+ remoteFetchTask =
replicaManager.remoteLogManager().get().asyncRead(
+ remoteStorageFetchInfo,
+ result -> {
+ remoteFetchResult.complete(result);
+ replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchGroupKey(shareFetch.groupId(),
remoteFetchTopicIdPartition.topicId(),
remoteFetchTopicIdPartition.partition()));
+ }
+ );
+ } catch (RejectedExecutionException e) {
+ // Return the error if any in scheduling the remote fetch task.
+ log.warn("Unable to fetch data from remote storage", e);
+ remoteStorageFetchException = Optional.of(e);
+ return Optional.of(e);
Review Comment:
Could we just throw the exception and remove the return value?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +592,308 @@ Lock lock() {
return lock;
}
+ // Visible for testing.
+ RemoteFetch remoteFetch() {
+ return remoteFetchOpt.orElse(null);
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
+
+ private Optional<TopicPartitionRemoteFetchInfo>
maybePrepareRemoteStorageFetchInfo(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ Optional<TopicPartitionRemoteFetchInfo>
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+ for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponse.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ LogReadResult logReadResult = entry.getValue();
+ if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+ // TODO: There is a limitation in remote storage fetch for
consumer groups that we can only perform remote fetch for
+ // a single topic partition in a fetch request. Since, the
logic of fetch is largely based on how consumer groups work,
+ // we are following the same logic. However, this problem
should be addressed as part of KAFKA-19133 which should help us perform
+ // fetch for multiple remote fetch topic partition in a
single share fetch request
+ topicPartitionRemoteFetchInfoOpt = Optional.of(new
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+ partitionsAcquired.put(topicIdPartition,
topicPartitionData.get(topicIdPartition));
+ break;
+ }
+ }
+ return topicPartitionRemoteFetchInfoOpt;
+ }
+
+ private boolean maybeProcessRemoteFetch(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+ ) throws Exception {
+ Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new
LinkedHashSet<>();
+ topicPartitionData.keySet().forEach(topicIdPartition -> {
+ // topic partitions for which fetch would not be happening in this
share fetch request.
+ if
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+ nonRemoteFetchTopicPartitions.add(topicIdPartition);
+ }
+ });
+ // Release fetch lock for the topic partitions that were acquired but
were not a part of remote fetch and add
+ // them to the delayed actions queue.
+
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+ Optional<Exception> exceptionOpt =
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+ if (exceptionOpt.isPresent()) {
+ throw exceptionOpt.get();
+ }
+ // Check if remote fetch can be completed.
+ return maybeCompletePendingRemoteFetch();
+ }
+
+ /**
+ * Returns an option containing an exception if a task for
RemoteStorageFetchInfo could not be scheduled successfully else returns empty
optional.
+ * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic
partition information.
+ */
+ private Optional<Exception> processRemoteFetchOrException(
+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+ ) {
+ TopicIdPartition remoteFetchTopicIdPartition =
topicPartitionRemoteFetchInfo.topicIdPartition();
+ RemoteStorageFetchInfo remoteStorageFetchInfo =
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+ Future<Void> remoteFetchTask;
+ CompletableFuture<RemoteLogReadResult> remoteFetchResult = new
CompletableFuture<>();
+ try {
+ remoteFetchTask =
replicaManager.remoteLogManager().get().asyncRead(
+ remoteStorageFetchInfo,
+ result -> {
+ remoteFetchResult.complete(result);
+ replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchGroupKey(shareFetch.groupId(),
remoteFetchTopicIdPartition.topicId(),
remoteFetchTopicIdPartition.partition()));
+ }
+ );
+ } catch (RejectedExecutionException e) {
+ // Return the error if any in scheduling the remote fetch task.
+ log.warn("Unable to fetch data from remote storage", e);
+ remoteStorageFetchException = Optional.of(e);
+ return Optional.of(e);
+ } catch (Exception e) {
+ remoteStorageFetchException = Optional.of(e);
Review Comment:
Should we just keep this and remove `catch (RejectedExecutionException e)`?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +323,39 @@ public boolean tryComplete() {
return false;
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
- releasePartitionLocks(topicPartitionData.keySet());
- partitionsAcquired.clear();
- partitionsAlreadyFetched.clear();
- return forceComplete();
+ // In case we have a remote fetch exception, we have already
released locks for partitions which have potential
+ // local log read. We do not release locks for partitions which
have a remote storage read because we need to
+ // complete the share fetch request in onComplete and if we
release the locks early here, some other DelayedShareFetch
+ // request might get the locks for those partitions without this
one getting complete.
+ if (remoteStorageFetchException.isEmpty()) {
+ releasePartitionLocks(topicPartitionData.keySet());
+ partitionsAcquired.clear();
+ localPartitionsAlreadyFetched.clear();
+ return forceComplete();
+ } else {
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then that
means the request is already completed
+ // hence release the acquired locks. This can occur in case of
remote storage fetch if there is a thread that
Review Comment:
> then that means the request is already completed hence release the
acquired locks.
then that means the request is already completed and hence the acquired
locks are already released. Ditto above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]