This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0023095fae2 Refactor Expression transformer to move sort logic to
utils (#17442)
0023095fae2 is described below
commit 0023095fae2e672db331e34bb2dd979a74f93fae
Author: Krishan Goyal <[email protected]>
AuthorDate: Mon Jan 5 14:50:16 2026 +0530
Refactor Expression transformer to move sort logic to utils (#17442)
---
.../recordtransformer/ExpressionTransformer.java | 64 +------------
.../local/utils/ExpressionTransformerUtils.java | 105 +++++++++++++++++++++
2 files changed, 108 insertions(+), 61 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index 3e7649f663f..5e25e05c300 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -19,21 +19,16 @@
package org.apache.pinot.segment.local.recordtransformer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
-import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.utils.ExpressionTransformerUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
@@ -51,70 +46,17 @@ public class ExpressionTransformer implements
RecordTransformer {
private static final Logger LOGGER =
LoggerFactory.getLogger(ExpressionTransformer.class);
@VisibleForTesting
- final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new
LinkedHashMap<>();
+ final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators;
private final boolean _continueOnError;
private final ThrottledLogger _throttledLogger;
public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
- Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
+ _expressionEvaluators =
ExpressionTransformerUtils.getTopologicallySortedExpressions(tableConfig,
schema);
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
- if (ingestionConfig != null && ingestionConfig.getTransformConfigs() !=
null) {
- for (TransformConfig transformConfig :
ingestionConfig.getTransformConfigs()) {
- FunctionEvaluator previous =
expressionEvaluators.put(transformConfig.getColumnName(),
-
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
- Preconditions.checkState(previous == null,
- "Cannot set more than one ingestion transform function on column:
%s.", transformConfig.getColumnName());
- }
- }
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- String fieldName = fieldSpec.getName();
- if (!fieldSpec.isVirtualColumn() &&
!expressionEvaluators.containsKey(fieldName)) {
- FunctionEvaluator functionEvaluator =
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
- if (functionEvaluator != null) {
- expressionEvaluators.put(fieldName, functionEvaluator);
- }
- }
- }
-
- // Carry out DFS traversal to topologically sort column names based on
transform function dependencies. Throw
- // exception if a cycle is discovered. When a name is first seen it is
added to discoveredNames set. When a name
- // is completely processed (i.e the name and all of its dependencies have
been fully explored and no cycles have
- // been seen), it gets added to the _expressionEvaluators list in
topologically sorted order. Fully explored
- // names are removed from discoveredNames set.
- Set<String> discoveredNames = new HashSet<>();
- for (Map.Entry<String, FunctionEvaluator> entry :
expressionEvaluators.entrySet()) {
- String columnName = entry.getKey();
- if (!_expressionEvaluators.containsKey(columnName)) {
- topologicalSort(columnName, expressionEvaluators, discoveredNames);
- }
- }
-
_continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
_throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
- private void topologicalSort(String column, Map<String, FunctionEvaluator>
expressionEvaluators,
- Set<String> discoveredNames) {
- FunctionEvaluator functionEvaluator = expressionEvaluators.get(column);
- if (functionEvaluator == null) {
- return;
- }
-
- if (discoveredNames.add(column)) {
- List<String> arguments = functionEvaluator.getArguments();
- for (String arg : arguments) {
- if (!_expressionEvaluators.containsKey(arg)) {
- topologicalSort(arg, expressionEvaluators, discoveredNames);
- }
- }
- _expressionEvaluators.put(column, functionEvaluator);
- discoveredNames.remove(column);
- } else {
- throw new IllegalStateException(
- "Expression cycle found for column '" + column + "' in Ingestion
Transform " + "Function definitions.");
- }
- }
-
@Override
public boolean isNoOp() {
return _expressionEvaluators.isEmpty();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
new file mode 100644
index 00000000000..9f02869f5e6
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ExpressionTransformerUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class ExpressionTransformerUtils {
+
+ private ExpressionTransformerUtils() {
+ // Utility class - prevent instantiation
+ }
+
+ public static LinkedHashMap<String, FunctionEvaluator>
getTopologicallySortedExpressions(
+ TableConfig tableConfig, Schema schema) {
+ LinkedHashMap<String, FunctionEvaluator> sortedEvaluators = new
LinkedHashMap<>();
+
+ Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ if (ingestionConfig != null && ingestionConfig.getTransformConfigs() !=
null) {
+ for (TransformConfig transformConfig :
ingestionConfig.getTransformConfigs()) {
+ FunctionEvaluator previous =
expressionEvaluators.put(transformConfig.getColumnName(),
+
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
+ Preconditions.checkState(previous == null,
+ "Cannot set more than one ingestion transform function on column:
%s.", transformConfig.getColumnName());
+ }
+ }
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ String fieldName = fieldSpec.getName();
+ if (!fieldSpec.isVirtualColumn() &&
!expressionEvaluators.containsKey(fieldName)) {
+ FunctionEvaluator functionEvaluator =
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+ if (functionEvaluator != null) {
+ expressionEvaluators.put(fieldName, functionEvaluator);
+ }
+ }
+ }
+
+ // Carry out DFS traversal to topologically sort column names based on
transform function dependencies. Throw
+ // exception if a cycle is discovered. When a name is first seen it is
added to discoveredNames set. When a name
+ // is completely processed (i.e the name and all of its dependencies have
been fully explored and no cycles have
+ // been seen), it gets added to the sortedEvaluators list in topologically
sorted order. Fully explored
+ // names are removed from discoveredNames set.
+ Set<String> discoveredNames = new HashSet<>();
+ for (Map.Entry<String, FunctionEvaluator> entry :
expressionEvaluators.entrySet()) {
+ String columnName = entry.getKey();
+ if (!sortedEvaluators.containsKey(columnName)) {
+ topologicalSort(columnName, expressionEvaluators, sortedEvaluators,
discoveredNames);
+ }
+ }
+
+ return sortedEvaluators;
+ }
+
+ private static void topologicalSort(String column, Map<String,
FunctionEvaluator> allEvaluators,
+ Map<String, FunctionEvaluator> sortedEvaluators,
+ Set<String> discoveredNames) {
+ FunctionEvaluator functionEvaluator = allEvaluators.get(column);
+ if (functionEvaluator == null) {
+ return;
+ }
+
+ if (discoveredNames.add(column)) {
+ List<String> arguments = functionEvaluator.getArguments();
+ for (String arg : arguments) {
+ if (!sortedEvaluators.containsKey(arg)) {
+ topologicalSort(arg, allEvaluators, sortedEvaluators,
discoveredNames);
+ }
+ }
+ sortedEvaluators.put(column, functionEvaluator);
+ discoveredNames.remove(column);
+ } else {
+ throw new IllegalStateException(
+ "Expression cycle found for column '" + column + "' in Ingestion
Transform " + "Function definitions.");
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]