junrao commented on code in PR #16443:
URL: https://github.com/apache/kafka/pull/16443#discussion_r1776008681
##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -177,9 +178,19 @@ ControllerResult<Map<String, ApiError>> updateFeatures(
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+
+ Map<String, Short> proposedUpdatedVersions = new HashMap<>();
+ finalizedVersions.forEach(proposedUpdatedVersions::put);
+ proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get().featureLevel());
+ updates.forEach(proposedUpdatedVersions::put);
+
for (Entry<String, Short> entry : updates.entrySet()) {
- results.put(entry.getKey(), updateFeature(entry.getKey(),
entry.getValue(),
- upgradeTypes.getOrDefault(entry.getKey(),
FeatureUpdate.UpgradeType.UPGRADE), records));
+ ApiError error = updateFeature(entry.getKey(), entry.getValue(),
+ upgradeTypes.getOrDefault(entry.getKey(),
FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
+ if (!error.error().equals(Errors.NONE)) {
+ return ControllerResult.of(Collections.emptyList(),
Collections.singletonMap(entry.getKey(), error));
Review Comment:
This works for v2, but it changes the behavior for v1. In v1, we should
still prepare an ApiError for each feature?
##########
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java:
##########
@@ -82,17 +84,31 @@ public static UpdateFeaturesResponse parse(ByteBuffer
buffer, short version) {
return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new
ByteBufferAccessor(buffer), version));
}
- public static UpdateFeaturesResponse createWithErrors(ApiError
topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
+ public static UpdateFeaturesResponse createWithErrors(short version,
ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs)
{
final UpdatableFeatureResultCollection results = new
UpdatableFeatureResultCollection();
- for (final Map.Entry<String, ApiError> updateError :
updateErrors.entrySet()) {
- final String feature = updateError.getKey();
- final ApiError error = updateError.getValue();
- final UpdatableFeatureResult result = new UpdatableFeatureResult();
- result.setFeature(feature)
- .setErrorCode(error.error().code())
- .setErrorMessage(error.message());
- results.add(result);
+ Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty();
+ if (version > 1) {
+ Stream<Map.Entry<String, ApiError>> errorEntries =
updateErrors.entrySet().stream().filter(entry ->
Review Comment:
For v2, we can just ignore updateErrors and only consider topLevelError,
right?
##########
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java:
##########
@@ -82,17 +84,31 @@ public static UpdateFeaturesResponse parse(ByteBuffer
buffer, short version) {
return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new
ByteBufferAccessor(buffer), version));
}
- public static UpdateFeaturesResponse createWithErrors(ApiError
topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
+ public static UpdateFeaturesResponse createWithErrors(short version,
ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs)
{
final UpdatableFeatureResultCollection results = new
UpdatableFeatureResultCollection();
- for (final Map.Entry<String, ApiError> updateError :
updateErrors.entrySet()) {
- final String feature = updateError.getKey();
- final ApiError error = updateError.getValue();
- final UpdatableFeatureResult result = new UpdatableFeatureResult();
- result.setFeature(feature)
- .setErrorCode(error.error().code())
- .setErrorMessage(error.message());
- results.add(result);
+ Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty();
+ if (version > 1) {
+ Stream<Map.Entry<String, ApiError>> errorEntries =
updateErrors.entrySet().stream().filter(entry ->
+ !entry.getValue().error().equals(Errors.NONE));
+ errorEntry = errorEntries.findFirst();
}
+
+ if (errorEntry.isPresent()) {
+ String errorFeatureName = errorEntry.get().getKey();
Review Comment:
For v1, the existing behavior is that if updateErrors is not empty, we set
topLevelError to be none. It seems that we are changing the behavior here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]