jsancio commented on code in PR #19545:
URL: https://github.com/apache/kafka/pull/19545#discussion_r2058561246
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##########
@@ -66,48 +66,48 @@ public static BootstrapMetadata fromVersions(
setFeatureLevel(level), (short) 0));
}
}
- return new BootstrapMetadata(records, metadataVersion, source);
+ return new BootstrapMetadata(records, metadataVersion.featureLevel(),
source);
}
public static BootstrapMetadata fromVersion(MetadataVersion
metadataVersion, String source) {
List<ApiMessageAndVersion> records = List.of(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
- return new BootstrapMetadata(records, metadataVersion, source);
+ return new BootstrapMetadata(records, metadataVersion.featureLevel(),
source);
}
public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion>
records, String source) {
- MetadataVersion metadataVersion = null;
+ Optional<Short> metadataVersionLevel = Optional.empty();
for (ApiMessageAndVersion record : records) {
- Optional<MetadataVersion> version =
recordToMetadataVersion(record.message());
- if (version.isPresent()) {
- metadataVersion = version.get();
+ Optional<Short> level =
recordToMetadataVersionLevel(record.message());
+ if (level.isPresent()) {
+ metadataVersionLevel = level;
}
}
- if (metadataVersion == null) {
+ if (!metadataVersionLevel.isPresent()) {
throw new RuntimeException("No FeatureLevelRecord for " +
MetadataVersion.FEATURE_NAME +
" was found in the bootstrap metadata from " + source);
}
- return new BootstrapMetadata(records, metadataVersion, source);
+ return new BootstrapMetadata(records, metadataVersionLevel.get(),
source);
}
- public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage
record) {
+ public static Optional<Short> recordToMetadataVersionLevel(ApiMessage
record) {
if (record instanceof FeatureLevelRecord featureLevel) {
if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
- return
Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel()));
+ return Optional.of(featureLevel.featureLevel());
Review Comment:
This works because, in the cluster metadata partition, Kafka 4.0 only
deprecated and removed MV less than 7. If in the future Kafka drops supports
for other metadata records or fields then we will need to revisit this issue,
right?
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##########
@@ -117,7 +117,7 @@ public List<ApiMessageAndVersion> records() {
}
public MetadataVersion metadataVersion() {
- return metadataVersion;
+ return MetadataVersion.fromFeatureLevel(metadataVersionLevel);
Review Comment:
Right. Kafka assumes that if there is data loss it is because the entire
disk failed. Kafka doesn't handle error in the filesystem were some data was
lost after an fsync.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##########
@@ -66,48 +66,48 @@ public static BootstrapMetadata fromVersions(
setFeatureLevel(level), (short) 0));
}
}
- return new BootstrapMetadata(records, metadataVersion, source);
+ return new BootstrapMetadata(records, metadataVersion.featureLevel(),
source);
}
public static BootstrapMetadata fromVersion(MetadataVersion
metadataVersion, String source) {
List<ApiMessageAndVersion> records = List.of(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
- return new BootstrapMetadata(records, metadataVersion, source);
+ return new BootstrapMetadata(records, metadataVersion.featureLevel(),
source);
}
public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion>
records, String source) {
- MetadataVersion metadataVersion = null;
+ Optional<Short> metadataVersionLevel = Optional.empty();
for (ApiMessageAndVersion record : records) {
- Optional<MetadataVersion> version =
recordToMetadataVersion(record.message());
- if (version.isPresent()) {
- metadataVersion = version.get();
+ Optional<Short> level =
recordToMetadataVersionLevel(record.message());
+ if (level.isPresent()) {
+ metadataVersionLevel = level;
}
}
- if (metadataVersion == null) {
+ if (!metadataVersionLevel.isPresent()) {
Review Comment:
Yes. Newer version of Java added `isEmpty`. We can use that in Kafka since
we dropped support for Java 8.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]