junrao commented on code in PR #16443:
URL: https://github.com/apache/kafka/pull/16443#discussion_r1771950807
##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -241,6 +255,16 @@ private ApiError updateFeature(
// Perform additional checks if we're updating metadata.version
return updateMetadataVersion(newVersion,
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
} else {
+ // Validate dependencies for features that are not metadata.version
+ try {
+ if (newVersion != 0) {
+ Features.validateVersion(
+
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),
Review Comment:
It would be useful to add a comment on why allowUnstableFeatureVersions is
always true.
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -2305,11 +2307,34 @@ public CompletableFuture<UpdateFeaturesResponseData>
updateFeatures(
}).thenApply(result -> {
UpdateFeaturesResponseData responseData = new
UpdateFeaturesResponseData();
responseData.setResults(new
UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
- result.forEach((featureName, error) -> responseData.results().add(
- new UpdateFeaturesResponseData.UpdatableFeatureResult()
- .setFeature(featureName)
- .setErrorCode(error.error().code())
- .setErrorMessage(error.message())));
+ Optional<Entry<String, ApiError>> errorEntry = Optional.empty();
+ if (context.requestHeader().requestApiVersion() > 1) {
+ Stream<Entry<String, ApiError>> errorEntries =
result.entrySet().stream().filter(entry ->
+ !entry.getValue().error().equals(Errors.NONE));
+ errorEntry = errorEntries.findFirst();
+ }
+
+ if (errorEntry.isPresent()) {
+ String errorFeatureName = errorEntry.get().getKey();
+ ApiError topError = errorEntry.get().getValue();
+ String errorString = errorFeatureName + ":" +
topError.error().exceptionName() + " (" + topError.message() + ")";
+
responseData.setErrorCode(Errors.INVALID_UPDATE_VERSION.code());
Review Comment:
Should we use the error code from `topError`?
##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -174,15 +175,27 @@ ControllerResult<Map<String, ApiError>> updateFeatures(
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
boolean validateOnly
) {
+ boolean updateFailed = false;
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+
+ Map<String, Short> proposedUpdatedVersions = new HashMap<>();
+ finalizedVersions.forEach(proposedUpdatedVersions::put);
+ proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get().featureLevel());
+ updates.forEach(proposedUpdatedVersions::put);
+
for (Entry<String, Short> entry : updates.entrySet()) {
- results.put(entry.getKey(), updateFeature(entry.getKey(),
entry.getValue(),
- upgradeTypes.getOrDefault(entry.getKey(),
FeatureUpdate.UpgradeType.UPGRADE), records));
+ ApiError error = updateFeature(entry.getKey(), entry.getValue(),
+ upgradeTypes.getOrDefault(entry.getKey(),
FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
+ results.put(entry.getKey(), error);
+ if (!error.error().equals(Errors.NONE)) {
+ updateFailed = true;
+ break;
+ }
}
- if (validateOnly) {
+ if (validateOnly || updateFailed) {
Review Comment:
We changed the implementation such that if one feature has an error, none of
the features will be processed. It seems that we only need to return a top
level error in UpdateFeaturesResponse. There is no need to have the per feature
error code.
##########
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java:
##########
@@ -82,17 +84,37 @@ public static UpdateFeaturesResponse parse(ByteBuffer
buffer, short version) {
return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new
ByteBufferAccessor(buffer), version));
}
- public static UpdateFeaturesResponse createWithErrors(ApiError
topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
+ public static UpdateFeaturesResponse createWithErrors(short version,
ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs)
{
final UpdatableFeatureResultCollection results = new
UpdatableFeatureResultCollection();
- for (final Map.Entry<String, ApiError> updateError :
updateErrors.entrySet()) {
- final String feature = updateError.getKey();
- final ApiError error = updateError.getValue();
- final UpdatableFeatureResult result = new UpdatableFeatureResult();
- result.setFeature(feature)
- .setErrorCode(error.error().code())
- .setErrorMessage(error.message());
- results.add(result);
+ Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty();
+ if (version > 1) {
+ Stream<Map.Entry<String, ApiError>> errorEntries =
updateErrors.entrySet().stream().filter(entry ->
+ !entry.getValue().error().equals(Errors.NONE));
+ errorEntry = errorEntries.findFirst();
}
+
+ if (errorEntry.isPresent()) {
+ String errorFeatureName = errorEntry.get().getKey();
+ ApiError topError = errorEntry.get().getValue();
+ String errorString = errorFeatureName + ":" +
topError.error().exceptionName() + " (" + topError.message() + ")";
+ topLevelError = new ApiError(Errors.INVALID_UPDATE_VERSION.code(),
Review Comment:
Should we use the error code from topError?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3603,13 +3603,16 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]):
Unit = {
def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
errors match {
+ // Hard-code version to 1 since version 2 will not be implemented
for 4.0
Review Comment:
How do we prevent the client from issuing V2 updateFeature request in ZK
mode?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]