junrao commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1731530754
##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -82,8 +82,13 @@ else if (requireTimestamp)
return new Builder(minVersion,
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
}
+ public static Builder forReplica(short allowedVersion, int replicaId,
boolean requireEarliestLocalTimestamp) {
+ short minVersion = requireEarliestLocalTimestamp ? (short) 8 :
(short) 0;
Review Comment:
I understand the intention of this logic, but it doesn't seem to do what you
want. The problem is the following. The client determines the version for a
request using the following logic in NetworkClient.
```
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version
information. This would be
// the case when sending the initial ApiVersionRequest which
fetches the version
// information itself. It is also the case when
discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {}
with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(),
clientRequest.correlationId(), nodeId, version);
} else {
version =
versionInfo.latestUsableVersion(clientRequest.apiKey(),
builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
```
As you can see, `builder.oldestAllowedVersion()` is only used for
determining the request version when `versionInfo` is not null. However, in
BrokerBlockingSender, NetworkClient is created with
discoverBrokerVersions=false, which means in NetworkClient, `apiVersions` is
not populated and `versionInfo` is always null.
To me, if we can gate tier storage with MV, gating ListOffset here is less
important.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
- val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
-
- try {
- val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
- topicPartition = topicPartition,
- timestamp = partition.timestamp,
- maxNumOffsets = partition.maxNumOffsets,
- isFromConsumer = offsetRequest.replicaId ==
ListOffsetsRequest.CONSUMER_REPLICA_ID,
- fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+ if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.NONE.code)
- .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
- } catch {
- // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cases since these error messages
- // are typically transient and there is no value in logging the
entire stack trace for the same
- case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderOrFollowerException |
- _ : KafkaStorageException) =>
- debug("Offset request with correlation id %d from client %s on
partition %s failed due to %s".format(
- correlationId, clientId, topicPartition, e.getMessage))
- new ListOffsetsPartitionResponse()
- .setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.forException(e).code)
- case e: Throwable =>
- error("Error while responding to offset request", e)
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ } else {
+ val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
+
+ try {
+ val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
Review Comment:
Regarding gating tier storage on MV 3.6, we could potentially pass in a
remoteLogManagerSupplier instead of a direct remoteLogManager to
ReplicaManager. The supplier will instantiate remoteLogManager on first usage
based on the remote storage config and the MV setting at that time. The MV
initialization happens before the opening of socket server. So, by the time the
supplier is called, we can be sure that MV has been initialized. This still
doesn't support enabling remote storage dynamically, but is probably good
enough in 3.9.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]