jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
return current.get(lastStep());
}
+ /**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+ public Map<String, Object> updateValueFrom(
+ Map<String, Object> originalValue,
+ MapValueUpdater whenFound,
+ MapValueUpdater whenNotFound,
+ MapValueUpdater whenOther
+ ) {
+ return updateValue(originalValue, 0, whenFound, whenNotFound,
whenOther);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> updateValue(
+ Map<String, Object> originalValue,
+ int step,
+ MapValueUpdater whenFound,
+ MapValueUpdater whenNotFound,
+ MapValueUpdater whenOther
+ ) {
+ if (originalValue == null) return null;
+ Map<String, Object> updatedParent = new
HashMap<>(originalValue.size());
+ boolean found = false;
+ for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+ String fieldName = entry.getKey();
+ Object fieldValue = entry.getValue();
+ if (steps.get(step).equals(fieldName)) {
+ found = true;
+ if (step < lastStepIndex()) {
+ if (fieldValue instanceof Map) {
+ Map<String, Object> updatedField = updateValue(
+ (Map<String, Object>) fieldValue,
+ step + 1,
+ whenFound,
+ whenNotFound,
+ whenOther);
+ updatedParent.put(fieldName, updatedField);
+ } else {
+ // add back to not found and apply others, as only
leaf values are updated
+ found = false;
+ whenOther.apply(originalValue, updatedParent, null,
fieldName);
+ }
+ } else {
+ whenFound.apply(originalValue, updatedParent, this,
fieldName);
+ }
+ } else {
+ whenOther.apply(originalValue, updatedParent, null, fieldName);
+ }
+ }
+
+ if (!found) {
+ whenNotFound.apply(originalValue, updatedParent, this,
steps.get(step));
+ }
+
+ return updatedParent;
+ }
Review Comment:
Thanks! I like the proposed approach. The only change I'm considering to add
is to return an intact map if value is not found (the proposal is to return
null).
This should make the behavior consistent with update Struct and Schema that
do not make this distinction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]