This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d5740856f4 Support conjugates for scalar functions, add more scalar 
functions (#8582)
d5740856f4 is described below

commit d5740856f481c34808b7027c0fd1b6bff068d04d
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Wed May 4 23:58:58 2022 +0530

    Support conjugates for scalar functions, add more scalar functions (#8582)
    
    This PR adds support for being able to use complex scalar function 
expressions for fitlterConfig. In addition to common comparison functions, 
support for AND, OR and NOT has been added
---
 .../function/scalar/ComparisonFunctions.java       |  65 +++++++++
 .../common/function/scalar/ObjectFunctions.java    |  37 ++++++
 .../local/function/InbuiltFunctionEvaluator.java   | 109 ++++++++++++++--
 .../recordtransformer/RecordTransformerTest.java   | 145 +++++++++++++++++++++
 4 files changed, 346 insertions(+), 10 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
new file mode 100644
index 0000000000..781c949e6e
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+public class ComparisonFunctions {
+
+  private static final double DOUBLE_COMPARISON_TOLERANCE = 1e-7d;
+
+  private ComparisonFunctions() {
+  }
+
+  @ScalarFunction
+  public static boolean greaterThan(double a, double b) {
+    return a > b;
+  }
+
+  @ScalarFunction
+  public static boolean greaterThanOrEqual(double a, double b) {
+    return a >= b;
+  }
+
+  @ScalarFunction
+  public static boolean lessThan(double a, double b) {
+    return a < b;
+  }
+
+  @ScalarFunction
+  public static boolean lessThanOrEqual(double a, double b) {
+    return a <= b;
+  }
+
+  @ScalarFunction
+  public static boolean notEquals(double a, double b) {
+    return Math.abs(a - b) >= DOUBLE_COMPARISON_TOLERANCE;
+  }
+
+  @ScalarFunction
+  public static boolean equals(double a, double b) {
+    // To avoid approximation errors
+    return Math.abs(a - b) < DOUBLE_COMPARISON_TOLERANCE;
+  }
+
+  @ScalarFunction
+  public static boolean between(double val, double a, double b) {
+    return val > a && val < b;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
new file mode 100644
index 0000000000..2c719a4cfb
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+
+public class ObjectFunctions {
+  private ObjectFunctions() {
+  }
+
+  @ScalarFunction(nullableParameters = true)
+  public static boolean isNull(@Nullable Object obj) {
+    return obj == null;
+  }
+
+  @ScalarFunction(nullableParameters = true)
+  public static boolean isNotNull(@Nullable Object obj) {
+    return !isNull(obj);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
index a894e010c6..18e1ee5031 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.function;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
@@ -68,17 +69,27 @@ public class InbuiltFunctionEvaluator implements 
FunctionEvaluator {
           childNodes[i] = planExecution(arguments.get(i));
         }
         String functionName = function.getFunctionName();
-        FunctionInfo functionInfo = 
FunctionRegistry.getFunctionInfo(functionName, numArguments);
-        if (functionInfo == null) {
-          if (FunctionRegistry.containsFunction(functionName)) {
-            throw new IllegalStateException(
-              String.format("Unsupported function: %s with %d parameters", 
functionName, numArguments));
-          } else {
-            throw new IllegalStateException(
-              String.format("Unsupported function: %s not found", 
functionName));
-          }
+        switch (functionName) {
+          case "and":
+            return new AndExecutionNode(childNodes);
+          case "or":
+            return new OrExecutionNode(childNodes);
+          case "not":
+            Preconditions.checkState(numArguments == 1, "NOT function expects 
1 argument, got: %s", numArguments);
+            return new NotExecutionNode(childNodes[0]);
+          default:
+            FunctionInfo functionInfo = 
FunctionRegistry.getFunctionInfo(functionName, numArguments);
+            if (functionInfo == null) {
+              if (FunctionRegistry.containsFunction(functionName)) {
+                throw new IllegalStateException(
+                    String.format("Unsupported function: %s with %d 
parameters", functionName, numArguments));
+              } else {
+                throw new IllegalStateException(
+                    String.format("Unsupported function: %s not found", 
functionName));
+              }
+            }
+            return new FunctionExecutionNode(functionInfo, childNodes);
         }
-        return new FunctionExecutionNode(functionInfo, childNodes);
       default:
         throw new IllegalStateException();
     }
@@ -106,6 +117,84 @@ public class InbuiltFunctionEvaluator implements 
FunctionEvaluator {
     Object execute(Object[] values);
   }
 
+  private static class NotExecutionNode implements ExecutableNode {
+    private final ExecutableNode _argumentNode;
+
+    NotExecutionNode(ExecutableNode argumentNode) {
+      _argumentNode = argumentNode;
+    }
+
+    @Override
+    public Object execute(GenericRow row) {
+      return !((Boolean) _argumentNode.execute(row));
+    }
+
+    @Override
+    public Object execute(Object[] values) {
+      return !((Boolean) _argumentNode.execute(values));
+    }
+  }
+
+  private static class OrExecutionNode implements ExecutableNode {
+    private final ExecutableNode[] _argumentNodes;
+
+    OrExecutionNode(ExecutableNode[] argumentNodes) {
+      _argumentNodes = argumentNodes;
+    }
+
+    @Override
+    public Object execute(GenericRow row) {
+      for (ExecutableNode executableNode :_argumentNodes) {
+        Boolean res = (Boolean) executableNode.execute(row);
+        if (res) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public Object execute(Object[] values) {
+      for (ExecutableNode executableNode :_argumentNodes) {
+        Boolean res = (Boolean) executableNode.execute(values);
+        if (res) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  private static class AndExecutionNode implements ExecutableNode {
+    private final ExecutableNode[] _argumentNodes;
+
+    AndExecutionNode(ExecutableNode[] argumentNodes) {
+      _argumentNodes = argumentNodes;
+    }
+
+    @Override
+    public Object execute(GenericRow row) {
+      for (ExecutableNode executableNode :_argumentNodes) {
+        Boolean res = (Boolean) executableNode.execute(row);
+        if (!res) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object execute(Object[] values) {
+      for (ExecutableNode executableNode :_argumentNodes) {
+        Boolean res = (Boolean) executableNode.execute(values);
+        if (!res) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
   private static class FunctionExecutionNode implements ExecutableNode {
     final FunctionInvoker _functionInvoker;
     final FunctionInfo _functionInfo;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index 195cc0cd7f..f76551c0f4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -81,6 +81,7 @@ public class RecordTransformerTest {
     record.putValue("svStringWithLengthLimit", "123");
     record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0});
     record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"});
+    record.putValue("svNullString", null);
     return record;
   }
 
@@ -178,6 +179,150 @@ public class RecordTransformerTest {
     }
   }
 
+  @Test
+  public void testScalarOps() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    // expression true, filtered
+    GenericRow genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svInt = 123"), null, null));
+    RecordTransformer transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svDouble > 120"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svDouble >= 123"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svDouble < 200"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svDouble <= 123"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svLong != 125"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svLong = 123"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 
125)"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+  }
+
+  private GenericRow getNullColumnsRecord() {
+    GenericRow record = new GenericRow();
+    record.putValue("svNullString", null);
+    record.putValue("svInt", (byte) 123);
+
+    record.putValue("mvLong", Collections.singletonList(123f));
+    record.putValue("mvNullFloat", null);
+    return record;
+  }
+
+  @Test
+  public void testObjectOps() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    // expression true, filtered
+    GenericRow genericRow = getNullColumnsRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null, new FilterConfig("svNullString is 
null"), null, null));
+    RecordTransformer transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getNullColumnsRecord();
+    tableConfig.setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("svInt is not null"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getNullColumnsRecord();
+    tableConfig.setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("mvLong is not null"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getNullColumnsRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null, new FilterConfig("mvNullFloat is 
null"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+  }
+
+  @Test
+  public void testLogicalScalarOps() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    // expression true, filtered
+    GenericRow genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null));
+    RecordTransformer transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+
+    // expression true, filtered
+    genericRow = getRecord();
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null,
+            new FilterConfig("svInt = 125 OR svLong <= 200"), null, null));
+    transformer = new FilterTransformer(tableConfig);
+    transformer.transform(genericRow);
+    
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
+  }
+
   @Test
   public void testNullValueTransformer() {
     RecordTransformer transformer = new NullValueTransformer(TABLE_CONFIG, 
SCHEMA);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to