This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0023095fae2 Refactor Expression transformer to move sort logic to 
utils (#17442)
0023095fae2 is described below

commit 0023095fae2e672db331e34bb2dd979a74f93fae
Author: Krishan Goyal <[email protected]>
AuthorDate: Mon Jan 5 14:50:16 2026 +0530

    Refactor Expression transformer to move sort logic to utils (#17442)
---
 .../recordtransformer/ExpressionTransformer.java   |  64 +------------
 .../local/utils/ExpressionTransformerUtils.java    | 105 +++++++++++++++++++++
 2 files changed, 108 insertions(+), 61 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index 3e7649f663f..5e25e05c300 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -19,21 +19,16 @@
 package org.apache.pinot.segment.local.recordtransformer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
-import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.utils.ExpressionTransformerUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
@@ -51,70 +46,17 @@ public class ExpressionTransformer implements 
RecordTransformer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpressionTransformer.class);
 
   @VisibleForTesting
-  final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new 
LinkedHashMap<>();
+  final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators;
   private final boolean _continueOnError;
   private final ThrottledLogger _throttledLogger;
 
   public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
-    Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
+    _expressionEvaluators = 
ExpressionTransformerUtils.getTopologicallySortedExpressions(tableConfig, 
schema);
     IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
-    if (ingestionConfig != null && ingestionConfig.getTransformConfigs() != 
null) {
-      for (TransformConfig transformConfig : 
ingestionConfig.getTransformConfigs()) {
-        FunctionEvaluator previous = 
expressionEvaluators.put(transformConfig.getColumnName(),
-            
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
-        Preconditions.checkState(previous == null,
-            "Cannot set more than one ingestion transform function on column: 
%s.", transformConfig.getColumnName());
-      }
-    }
-    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-      String fieldName = fieldSpec.getName();
-      if (!fieldSpec.isVirtualColumn() && 
!expressionEvaluators.containsKey(fieldName)) {
-        FunctionEvaluator functionEvaluator = 
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
-        if (functionEvaluator != null) {
-          expressionEvaluators.put(fieldName, functionEvaluator);
-        }
-      }
-    }
-
-    // Carry out DFS traversal to topologically sort column names based on 
transform function dependencies. Throw
-    // exception if a cycle is discovered. When a name is first seen it is 
added to discoveredNames set. When a name
-    // is completely processed (i.e the name and all of its dependencies have 
been fully explored and no cycles have
-    // been seen), it gets added to the _expressionEvaluators list in 
topologically sorted order. Fully explored
-    // names are removed from discoveredNames set.
-    Set<String> discoveredNames = new HashSet<>();
-    for (Map.Entry<String, FunctionEvaluator> entry : 
expressionEvaluators.entrySet()) {
-      String columnName = entry.getKey();
-      if (!_expressionEvaluators.containsKey(columnName)) {
-        topologicalSort(columnName, expressionEvaluators, discoveredNames);
-      }
-    }
-
     _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
     _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
-  private void topologicalSort(String column, Map<String, FunctionEvaluator> 
expressionEvaluators,
-      Set<String> discoveredNames) {
-    FunctionEvaluator functionEvaluator = expressionEvaluators.get(column);
-    if (functionEvaluator == null) {
-      return;
-    }
-
-    if (discoveredNames.add(column)) {
-      List<String> arguments = functionEvaluator.getArguments();
-      for (String arg : arguments) {
-        if (!_expressionEvaluators.containsKey(arg)) {
-          topologicalSort(arg, expressionEvaluators, discoveredNames);
-        }
-      }
-      _expressionEvaluators.put(column, functionEvaluator);
-      discoveredNames.remove(column);
-    } else {
-      throw new IllegalStateException(
-          "Expression cycle found for column '" + column + "' in Ingestion 
Transform " + "Function definitions.");
-    }
-  }
-
   @Override
   public boolean isNoOp() {
     return _expressionEvaluators.isEmpty();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
new file mode 100644
index 00000000000..9f02869f5e6
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class ExpressionTransformerUtils {
+
+  private ExpressionTransformerUtils() {
+    // Utility class - prevent instantiation
+  }
+
+  public static LinkedHashMap<String, FunctionEvaluator> 
getTopologicallySortedExpressions(
+      TableConfig tableConfig, Schema schema) {
+    LinkedHashMap<String, FunctionEvaluator> sortedEvaluators = new 
LinkedHashMap<>();
+
+    Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig != null && ingestionConfig.getTransformConfigs() != 
null) {
+      for (TransformConfig transformConfig : 
ingestionConfig.getTransformConfigs()) {
+        FunctionEvaluator previous = 
expressionEvaluators.put(transformConfig.getColumnName(),
+            
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
+        Preconditions.checkState(previous == null,
+            "Cannot set more than one ingestion transform function on column: 
%s.", transformConfig.getColumnName());
+      }
+    }
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      String fieldName = fieldSpec.getName();
+      if (!fieldSpec.isVirtualColumn() && 
!expressionEvaluators.containsKey(fieldName)) {
+        FunctionEvaluator functionEvaluator = 
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+        if (functionEvaluator != null) {
+          expressionEvaluators.put(fieldName, functionEvaluator);
+        }
+      }
+    }
+
+    // Carry out DFS traversal to topologically sort column names based on 
transform function dependencies. Throw
+    // exception if a cycle is discovered. When a name is first seen it is 
added to discoveredNames set. When a name
+    // is completely processed (i.e the name and all of its dependencies have 
been fully explored and no cycles have
+    // been seen), it gets added to the sortedEvaluators list in topologically 
sorted order. Fully explored
+    // names are removed from discoveredNames set.
+    Set<String> discoveredNames = new HashSet<>();
+    for (Map.Entry<String, FunctionEvaluator> entry : 
expressionEvaluators.entrySet()) {
+      String columnName = entry.getKey();
+      if (!sortedEvaluators.containsKey(columnName)) {
+        topologicalSort(columnName, expressionEvaluators, sortedEvaluators, 
discoveredNames);
+      }
+    }
+
+    return sortedEvaluators;
+  }
+
+  private static void topologicalSort(String column, Map<String, 
FunctionEvaluator> allEvaluators,
+      Map<String, FunctionEvaluator> sortedEvaluators,
+      Set<String> discoveredNames) {
+    FunctionEvaluator functionEvaluator = allEvaluators.get(column);
+    if (functionEvaluator == null) {
+      return;
+    }
+
+    if (discoveredNames.add(column)) {
+      List<String> arguments = functionEvaluator.getArguments();
+      for (String arg : arguments) {
+        if (!sortedEvaluators.containsKey(arg)) {
+          topologicalSort(arg, allEvaluators, sortedEvaluators, 
discoveredNames);
+        }
+      }
+      sortedEvaluators.put(column, functionEvaluator);
+      discoveredNames.remove(column);
+    } else {
+      throw new IllegalStateException(
+          "Expression cycle found for column '" + column + "' in Ingestion 
Transform " + "Function definitions.");
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to