Copilot commented on code in PR #17308:
URL: https://github.com/apache/pinot/pull/17308#discussion_r2586645880
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -54,24 +55,36 @@ public class ExpressionTransformer implements
RecordTransformer {
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new
LinkedHashMap<>();
private final boolean _continueOnError;
private final ThrottledLogger _throttledLogger;
Review Comment:
The field `_overwriteExistingValues` lacks a Javadoc comment explaining its
purpose. Add a comment describing when this flag is set to true (e.g., for
post-upsert transforms) and how it affects transformation behavior (i.e.,
whether existing non-null values are overwritten by transform functions).
```suggestion
private final ThrottledLogger _throttledLogger;
/**
* If set to {@code true}, transform functions will overwrite existing
non-null values in the row.
* This is typically enabled for post-upsert transforms, where transformed
columns may need to be recomputed
* even if they already have values. If {@code false}, only null or
missing values will be set by transforms.
*/
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -140,13 +154,18 @@ public void transform(GenericRow record) {
String column = entry.getKey();
FunctionEvaluator transformFunctionEvaluator = entry.getValue();
Object existingValue = record.getValue(column);
- if (existingValue == null) {
+ boolean treatAsNull = _overwriteExistingValues || existingValue == null
|| record.isNullValue(column);
+ if (treatAsNull) {
Review Comment:
The variable name `treatAsNull` is misleading. This flag indicates whether
to apply the transform, not whether the value is null. Consider renaming to
`shouldApplyTransform` or `shouldOverwrite` to better reflect its purpose.
```suggestion
boolean shouldApplyTransform = _overwriteExistingValues ||
existingValue == null || record.isNullValue(column);
if (shouldApplyTransform) {
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java:
##########
@@ -312,6 +315,52 @@ public void testDeleteWithPartialUpsert()
upsertConfig);
}
+ @Test
+ public void testPartialUpsertPostUpdateTransforms()
+ throws Exception {
+ String tableName = "partialUpsertPostTransforms";
+ String kafkaTopicName = getKafkaTopic() + "-post-transforms";
+ createKafkaTopic(kafkaTopicName);
+
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(tableName)
+ .addSingleValueDimension("id", FieldSpec.DataType.INT)
+ .addMetric("score", FieldSpec.DataType.FLOAT)
+ .addMetric("bonus", FieldSpec.DataType.FLOAT)
+ .addMetric("total", FieldSpec.DataType.FLOAT)
+ .addDateTime(TIME_COL_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(List.of("id"))
+ .build();
+ addSchema(schema);
+
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ upsertConfig.setComparisonColumn(TIME_COL_NAME);
+ upsertConfig.setPartialUpsertPostUpdateTransformConfigs(
+ List.of(new TransformConfig("total", "plus(score,bonus)")));
+
+ Map<String, String> csvDecoderProperties =
+ getCSVDecoderProperties(CSV_DELIMITER, "id,score,bonus,total," +
TIME_COL_NAME);
+ csvDecoderProperties.put(
+ StreamConfigProperties.constructStreamProperty("kafka",
"decoder.prop.nullStringValue"), "NULL");
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
+ upsertConfig, "id");
+ addTableConfig(tableConfig);
+
+ // First record sets score, second sets bonus. Post-upsert transform
should compute total from the merged row.
Review Comment:
Add a comment explaining the test scenario: which fields are updated in each
record and why the expected total is 15.0. For example: '// First record: id=1,
score=5, bonus=NULL at t=1000; Second record: id=1, score=NULL, bonus=10 at
t=2000; After merge: score=5, bonus=10, total=5+10=15'.
```suggestion
// Test scenario:
// First record: id=1, score=5, bonus=NULL, total=NULL, timestamp=1000
// Second record: id=1, score=NULL, bonus=10, total=NULL, timestamp=2000
// After partial upsert (using timestamp as comparison column), the
merged row for id=1 is:
// score=5 (from first record), bonus=10 (from second record),
total=NULL
// Post-update transform computes total=score+bonus=5+10=15
// Expected: score=5, bonus=10, total=15
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java:
##########
@@ -105,6 +116,71 @@ public void testCustomPartialUpsertMergerWithNullResult() {
testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger());
}
+ @Test
+ public void testPostUpsertTransformRunsAfterMerge() {
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("pk",
+ FieldSpec.DataType.STRING)
+ .addSingleValueDimension("firstName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("lastName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("fullName", FieldSpec.DataType.STRING)
+ .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG,
"1:HOURS:EPOCH", "1:HOURS")
+ .setPrimaryKeyColumns(List.of("pk")).build();
+
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ upsertConfig.setComparisonColumns(List.of("hoursSinceEpoch"));
+ upsertConfig.setPartialUpsertPostUpdateTransformConfigs(
+ List.of(new TransformConfig("fullName",
"concat(firstName,lastName)")));
+ TableConfig tableConfig = createTableConfig(schema, upsertConfig);
+ List<RecordTransformer> postUpdateTransformers =
+ RecordTransformerUtils.getPostPartialUpsertTransformers(tableConfig,
schema);
+ assertFalse(postUpdateTransformers.isEmpty());
+ PartialUpsertHandler handler =
+ new PartialUpsertHandler(tableConfig, schema,
List.of("hoursSinceEpoch"), upsertConfig);
+ List<RecordTransformer> handlerTransformers;
+ try {
+ Field transformersField =
PartialUpsertHandler.class.getDeclaredField("_postUpdateTransformers");
+ transformersField.setAccessible(true);
+ //noinspection unchecked
+ handlerTransformers = (List<RecordTransformer>)
transformersField.get(handler);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(handlerTransformers.isEmpty());
Review Comment:
Using reflection to access private fields in tests is brittle and violates
encapsulation. Consider adding a package-private getter method
`getPostUpdateTransformersForTesting()` in `PartialUpsertHandler` to avoid
reflection, or verify the behavior through observable side effects (e.g.,
checking that `fullName` is correctly computed after merge).
```suggestion
// Reflection-based access to _postUpdateTransformers removed; test
relies on observable side effects.
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerUtils.java:
##########
@@ -89,6 +92,31 @@ public static List<RecordTransformer>
getDefaultTransformers(TableConfig tableCo
return getTransformers(tableConfig, schema, false, false, false, false);
}
+ /**
+ * Returns transformers to apply after a partial upsert merge. Only
post-merge transform configs are honored to avoid
+ * re-running ingestion-time transforms.
Review Comment:
The Javadoc should document the parameters `tableConfig` and `schema`, and
clarify the return value. Consider adding: '@param tableConfig The table
configuration containing post-update transform configs', '@param schema The
table schema used for validation and type conversion', and '@return List of
transformers to apply after merge, or empty list if none configured'.
```suggestion
* re-running ingestion-time transforms.
*
* @param tableConfig The table configuration containing post-update
transform configs
* @param schema The table schema used for validation and type conversion
* @return List of transformers to apply after merge, or empty list if
none configured
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]