AndrewJSchofield commented on code in PR #17283:
URL: https://github.com/apache/kafka/pull/17283#discussion_r1778270660
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -46,59 +47,52 @@ public class ShareFetchUtils {
// Process the replica manager fetch response to update share partitions
and futures. We acquire the fetched data
// from share partitions.
- static CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> processFetchResponse(
+ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
processFetchResponse(
ShareFetchData shareFetchData,
Map<TopicIdPartition, FetchPartitionData> responseData,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ReplicaManager replicaManager
) {
- Map<TopicIdPartition,
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new
HashMap<>();
+ Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response =
new HashMap<>();
responseData.forEach((topicIdPartition, fetchPartitionData) -> {
SharePartition sharePartition = partitionCacheMap.get(new
SharePartitionKey(
shareFetchData.groupId(), topicIdPartition));
- futures.put(topicIdPartition,
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
- .handle((acquiredRecords, throwable) -> {
- log.trace("Acquired records for topicIdPartition: {}
with share fetch data: {}, records: {}",
- topicIdPartition, shareFetchData,
acquiredRecords);
- ShareFetchResponseData.PartitionData partitionData =
new ShareFetchResponseData.PartitionData()
-
.setPartitionIndex(topicIdPartition.partition());
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(topicIdPartition.partition())
+ .setAcknowledgeErrorCode(Errors.NONE.code());
- if (throwable != null) {
-
partitionData.setErrorCode(Errors.forException(throwable).code());
- return partitionData;
- }
+ if (fetchPartitionData.error.code() != Errors.NONE.code()) {
+ partitionData
+ .setRecords(null)
+ .setErrorCode(fetchPartitionData.error.code())
+ .setAcquiredRecords(Collections.emptyList());
- if (fetchPartitionData.error.code() ==
Errors.OFFSET_OUT_OF_RANGE.code()) {
- // In case we get OFFSET_OUT_OF_RANGE error,
that's because the Log Start Offset is later than the fetch offset.
- // So, we would update the start and end offset of
the share partition and still return an empty
- // response and let the client retry the fetch.
This way we do not lose out on the data that
- // would be returned for other share partitions in
the fetch request.
-
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
replicaManager));
-
partitionData.setPartitionIndex(topicIdPartition.partition())
- .setRecords(null)
- .setErrorCode(Errors.NONE.code())
-
.setAcquiredRecords(Collections.emptyList())
-
.setAcknowledgeErrorCode(Errors.NONE.code());
- return partitionData;
- }
-
- // Maybe, in the future, check if no records are
acquired, and we want to retry
- // replica manager fetch. Depends on the share
partition manager implementation,
- // if we want parallel requests for the same share
partition or not.
-
partitionData.setPartitionIndex(topicIdPartition.partition())
- .setRecords(fetchPartitionData.records)
- .setErrorCode(fetchPartitionData.error.code())
- .setAcquiredRecords(acquiredRecords)
- .setAcknowledgeErrorCode(Errors.NONE.code());
- return partitionData;
- }));
- });
- return CompletableFuture.allOf(futures.values().toArray(new
CompletableFuture[0])).thenApply(v -> {
- Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
processedResult = new HashMap<>();
- futures.forEach((topicIdPartition, future) ->
processedResult.put(topicIdPartition, future.join()));
- return processedResult;
+ // In case we get OFFSET_OUT_OF_RANGE error, that's because
the Log Start Offset is later than the fetch offset.
+ // So, we would update the start and end offset of the share
partition and still return an empty
+ // response and let the client retry the fetch. This way we do
not lose out on the data that
+ // would be returned for other share partitions in the fetch
request.
+ if (fetchPartitionData.error.code() ==
Errors.OFFSET_OUT_OF_RANGE.code()) {
+
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
replicaManager));
+ // We set the error code to NONE, as we have updated the
start offset of the share partition
+ // and the client can retry the fetch.
+ partitionData.setErrorCode(Errors.NONE.code());
+ }
+ } else {
+ List<AcquiredRecords> acquiredRecords =
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+ log.trace("Acquired records for topicIdPartition: {} with
share fetch data: {}, records: {}",
+ topicIdPartition, shareFetchData, acquiredRecords);
+ // Maybe, in the future, check if no records are acquired, and
we want to retry
Review Comment:
If no records were acquired, we should probably not be returning a bunch of
records of which none were acquired.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -119,18 +120,11 @@ public void onComplete() {
});
log.trace("Data successfully retrieved by replica manager: {}",
responseData);
- ShareFetchUtils.processFetchResponse(shareFetchData, responseData,
partitionCacheMap, replicaManager)
- .whenComplete((result, throwable) -> {
- if (throwable != null) {
- log.error("Error processing fetch response for share
partitions", throwable);
-
shareFetchData.future().completeExceptionally(throwable);
- } else {
- shareFetchData.future().complete(result);
- }
- // Releasing the lock to move ahead with the next request
in queue.
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionData.keySet());
- });
-
+ Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
Review Comment:
I think the semantics of the exception handling are different now. If
processFetchResponse throws, you'll skip the release of the locks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]