This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d5740856f4 Support conjugates for scalar functions, add more scalar functions (#8582) d5740856f4 is described below commit d5740856f481c34808b7027c0fd1b6bff068d04d Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Wed May 4 23:58:58 2022 +0530 Support conjugates for scalar functions, add more scalar functions (#8582) This PR adds support for being able to use complex scalar function expressions for fitlterConfig. In addition to common comparison functions, support for AND, OR and NOT has been added --- .../function/scalar/ComparisonFunctions.java | 65 +++++++++ .../common/function/scalar/ObjectFunctions.java | 37 ++++++ .../local/function/InbuiltFunctionEvaluator.java | 109 ++++++++++++++-- .../recordtransformer/RecordTransformerTest.java | 145 +++++++++++++++++++++ 4 files changed, 346 insertions(+), 10 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java new file mode 100644 index 0000000000..781c949e6e --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ComparisonFunctions.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.scalar; + +import org.apache.pinot.spi.annotations.ScalarFunction; + +public class ComparisonFunctions { + + private static final double DOUBLE_COMPARISON_TOLERANCE = 1e-7d; + + private ComparisonFunctions() { + } + + @ScalarFunction + public static boolean greaterThan(double a, double b) { + return a > b; + } + + @ScalarFunction + public static boolean greaterThanOrEqual(double a, double b) { + return a >= b; + } + + @ScalarFunction + public static boolean lessThan(double a, double b) { + return a < b; + } + + @ScalarFunction + public static boolean lessThanOrEqual(double a, double b) { + return a <= b; + } + + @ScalarFunction + public static boolean notEquals(double a, double b) { + return Math.abs(a - b) >= DOUBLE_COMPARISON_TOLERANCE; + } + + @ScalarFunction + public static boolean equals(double a, double b) { + // To avoid approximation errors + return Math.abs(a - b) < DOUBLE_COMPARISON_TOLERANCE; + } + + @ScalarFunction + public static boolean between(double val, double a, double b) { + return val > a && val < b; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java new file mode 100644 index 0000000000..2c719a4cfb --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ObjectFunctions.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.scalar; + +import javax.annotation.Nullable; +import org.apache.pinot.spi.annotations.ScalarFunction; + +public class ObjectFunctions { + private ObjectFunctions() { + } + + @ScalarFunction(nullableParameters = true) + public static boolean isNull(@Nullable Object obj) { + return obj == null; + } + + @ScalarFunction(nullableParameters = true) + public static boolean isNotNull(@Nullable Object obj) { + return !isNull(obj); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java index a894e010c6..18e1ee5031 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.function; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; @@ -68,17 +69,27 @@ public class InbuiltFunctionEvaluator implements FunctionEvaluator { childNodes[i] = planExecution(arguments.get(i)); } String functionName = function.getFunctionName(); - FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(functionName, numArguments); - if (functionInfo == null) { - if (FunctionRegistry.containsFunction(functionName)) { - throw new IllegalStateException( - String.format("Unsupported function: %s with %d parameters", functionName, numArguments)); - } else { - throw new IllegalStateException( - String.format("Unsupported function: %s not found", functionName)); - } + switch (functionName) { + case "and": + return new AndExecutionNode(childNodes); + case "or": + return new OrExecutionNode(childNodes); + case "not": + Preconditions.checkState(numArguments == 1, "NOT function expects 1 argument, got: %s", numArguments); + return new NotExecutionNode(childNodes[0]); + default: + FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(functionName, numArguments); + if (functionInfo == null) { + if (FunctionRegistry.containsFunction(functionName)) { + throw new IllegalStateException( + String.format("Unsupported function: %s with %d parameters", functionName, numArguments)); + } else { + throw new IllegalStateException( + String.format("Unsupported function: %s not found", functionName)); + } + } + return new FunctionExecutionNode(functionInfo, childNodes); } - return new FunctionExecutionNode(functionInfo, childNodes); default: throw new IllegalStateException(); } @@ -106,6 +117,84 @@ public class InbuiltFunctionEvaluator implements FunctionEvaluator { Object execute(Object[] values); } + private static class NotExecutionNode implements ExecutableNode { + private final ExecutableNode _argumentNode; + + NotExecutionNode(ExecutableNode argumentNode) { + _argumentNode = argumentNode; + } + + @Override + public Object execute(GenericRow row) { + return !((Boolean) _argumentNode.execute(row)); + } + + @Override + public Object execute(Object[] values) { + return !((Boolean) _argumentNode.execute(values)); + } + } + + private static class OrExecutionNode implements ExecutableNode { + private final ExecutableNode[] _argumentNodes; + + OrExecutionNode(ExecutableNode[] argumentNodes) { + _argumentNodes = argumentNodes; + } + + @Override + public Object execute(GenericRow row) { + for (ExecutableNode executableNode :_argumentNodes) { + Boolean res = (Boolean) executableNode.execute(row); + if (res) { + return true; + } + } + return false; + } + + @Override + public Object execute(Object[] values) { + for (ExecutableNode executableNode :_argumentNodes) { + Boolean res = (Boolean) executableNode.execute(values); + if (res) { + return true; + } + } + return false; + } + } + + private static class AndExecutionNode implements ExecutableNode { + private final ExecutableNode[] _argumentNodes; + + AndExecutionNode(ExecutableNode[] argumentNodes) { + _argumentNodes = argumentNodes; + } + + @Override + public Object execute(GenericRow row) { + for (ExecutableNode executableNode :_argumentNodes) { + Boolean res = (Boolean) executableNode.execute(row); + if (!res) { + return false; + } + } + return true; + } + + @Override + public Object execute(Object[] values) { + for (ExecutableNode executableNode :_argumentNodes) { + Boolean res = (Boolean) executableNode.execute(values); + if (!res) { + return false; + } + } + return true; + } + } + private static class FunctionExecutionNode implements ExecutableNode { final FunctionInvoker _functionInvoker; final FunctionInfo _functionInfo; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index 195cc0cd7f..f76551c0f4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -81,6 +81,7 @@ public class RecordTransformerTest { record.putValue("svStringWithLengthLimit", "123"); record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0}); record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"}); + record.putValue("svNullString", null); return record; } @@ -178,6 +179,150 @@ public class RecordTransformerTest { } } + @Test + public void testScalarOps() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + + // expression true, filtered + GenericRow genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svInt = 123"), null, null)); + RecordTransformer transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svDouble > 120"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svDouble >= 123"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svDouble < 200"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svDouble <= 123"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svLong != 125"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svLong = 123"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 125)"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + } + + private GenericRow getNullColumnsRecord() { + GenericRow record = new GenericRow(); + record.putValue("svNullString", null); + record.putValue("svInt", (byte) 123); + + record.putValue("mvLong", Collections.singletonList(123f)); + record.putValue("mvNullFloat", null); + return record; + } + + @Test + public void testObjectOps() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + + // expression true, filtered + GenericRow genericRow = getNullColumnsRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, new FilterConfig("svNullString is null"), null, null)); + RecordTransformer transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getNullColumnsRecord(); + tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("svInt is not null"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getNullColumnsRecord(); + tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("mvLong is not null"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getNullColumnsRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, new FilterConfig("mvNullFloat is null"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + } + + @Test + public void testLogicalScalarOps() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + + // expression true, filtered + GenericRow genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null)); + RecordTransformer transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + + // expression true, filtered + genericRow = getRecord(); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, + new FilterConfig("svInt = 125 OR svLong <= 200"), null, null)); + transformer = new FilterTransformer(tableConfig); + transformer.transform(genericRow); + Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); + } + @Test public void testNullValueTransformer() { RecordTransformer transformer = new NullValueTransformer(TABLE_CONFIG, SCHEMA); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org