Copilot commented on code in PR #16258: URL: https://github.com/apache/pinot/pull/16258#discussion_r2182903794
########## pinot-udf-test/src/main/java/org/apache/pinot/udf/test/UdfTestCluster.java: ########## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.udf.test; + +import java.util.Iterator; +import java.util.stream.Stream; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/// An interface for executing queries in Pinot function tests. +/// +/// For example, one implementation can start a local cluster on the same JVM while another can connect to a remote +/// cluster. +public interface UdfTestCluster extends AutoCloseable { + + void start(); + + /// Adds a table to the cluster with the given schema and table configuration.o Review Comment: Typo in the comment: remove the trailing `o` from `configuration.o`. ```suggestion /// Adds a table to the cluster with the given schema and table configuration. ``` ########## pinot-udf-test/src/main/java/org/apache/pinot/udf/test/UdfTestFramework.java: ########## @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.udf.test; + +import com.google.common.collect.Maps; +import java.math.BigDecimal; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.pinot.core.udf.Udf; +import org.apache.pinot.core.udf.UdfExample; +import org.apache.pinot.core.udf.UdfSignature; + +/// This is the entry paint for the UDF test framework. It allows to run tests for UDFs using a given cluster. +public class UdfTestFramework { + + private static final EquivalenceLevel[] EQUIVALENCE_LEVELS = new EquivalenceLevel[]{ + EquivalenceLevel.EQUAL, + EquivalenceLevel.BIG_DECIMAL_AS_DOUBLE, + EquivalenceLevel.NUMBER_AS_DOUBLE + }; + private final Set<Udf> _udfs; + private final UdfTestCluster _cluster; + private final Set<UdfTestScenario> _scenarios; + private final ExecutorService _executorService; + + public UdfTestFramework(Set<Udf> udfs, UdfTestCluster cluster, + Set<UdfTestScenario> scenarios, ExecutorService executorService) { + _udfs = udfs; + _cluster = cluster; + _scenarios = scenarios; + _executorService = executorService; + } + + public static UdfTestFramework fromServiceLoader(UdfTestCluster cluster, + ExecutorService executorService) { + Set<Udf> udfs = ServiceLoader.load(Udf.class).stream() + .map(ServiceLoader.Provider::get) + .collect(Collectors.toSet()); + Set<UdfTestScenario> scenarios = ServiceLoader.load(UdfTestScenario.Factory.class).stream() + .map(ServiceLoader.Provider::get) + .map(factory -> factory.create(cluster)) + .collect(Collectors.toSet()); + + return new UdfTestFramework(udfs, cluster, scenarios, executorService); + } + + public Set<Udf> getUdfs() { + return _udfs; + } + + public Set<UdfTestScenario> getScenarios() { + return _scenarios; + } + + public void startUp() { + PinotFunctionEnvGenerator.prepareEnvironment(_cluster, _udfs); + } + + /// Executes all UDFs in all scenarios and returns the results. + public UdfTestResult execute() + throws InterruptedException { + Map<Udf, Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>>> asyncResults + = Maps.newHashMapWithExpectedSize(_udfs.size()); + for (Udf udf : _udfs) { + Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> scenarioResults + = Maps.newHashMapWithExpectedSize(_scenarios.size()); + executeAsync(udf, scenarioResults); + asyncResults.put(udf, scenarioResults); + } + + return byUdf(asyncResults); + } + + /// Executes a single UDF in all scenarios and returns the results. + public UdfTestResult.ByScenario execute(Udf udf) + throws InterruptedException { + Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> scenarioResults + = Maps.newHashMapWithExpectedSize(_scenarios.size()); + executeAsync(udf, scenarioResults); + + return byScenario(scenarioResults); + } + + private void executeAsync( + Udf udf, + Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> scenarioResults + ) { + for (UdfTestScenario scenario : _scenarios) { + Set<UdfSignature> udfSignatures = udf.getExamples().keySet(); + Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>> signatureResults + = Maps.newHashMapWithExpectedSize(udfSignatures.size()); + for (UdfSignature signature: udfSignatures) { + signatureResults.put(signature, _executorService.submit(() -> scenario.execute(udf, signature))); + } + scenarioResults.put(scenario, signatureResults); + } + } + + private UdfTestResult byUdf( + Map<Udf, Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>>> tasks + ) throws InterruptedException { + Map<Udf, UdfTestResult.ByScenario> results = Maps.newHashMapWithExpectedSize(tasks.size()); + for (Map.Entry<Udf, Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>>> entry + : tasks.entrySet()) { + Udf udf = entry.getKey(); + Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> scenarioTasks + = entry.getValue(); + results.put(udf, byScenario(scenarioTasks)); + } + return new UdfTestResult(results); + } + + private UdfTestResult.ByScenario byScenario( + Map<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> scenarioTasks + ) throws InterruptedException { + Map<UdfTestScenario, UdfTestResult.BySignature> results = Maps.newHashMapWithExpectedSize(scenarioTasks.size()); + for (Map.Entry<UdfTestScenario, Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>>> entry + : scenarioTasks.entrySet()) { + UdfTestScenario scenario = entry.getKey(); + Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>> tasks = entry.getValue(); + results.put(scenario, bySignature(tasks)); + } + return new UdfTestResult.ByScenario(results); + } + + private UdfTestResult.BySignature bySignature( + Map<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>> tasks + ) throws InterruptedException { + Map<UdfSignature, ResultByExample> results = Maps.newHashMapWithExpectedSize(tasks.size()); + for (Map.Entry<UdfSignature, Future<Map<UdfExample, UdfExampleResult>>> entry : tasks.entrySet()) { + UdfSignature signature = entry.getKey(); + Future<Map<UdfExample, UdfExampleResult>> task = entry.getValue(); + ResultByExample result = resolve(task); + results.put(signature, result); + } + return new UdfTestResult.BySignature(results); + } + + private ResultByExample resolve(Future<Map<UdfExample, UdfExampleResult>> task) + throws InterruptedException { + try { + Map<UdfExample, UdfExampleResult> result = task.get(); + Map<UdfExample, EquivalenceLevel> comparisons = Maps.newHashMapWithExpectedSize(result.size()); + Map<UdfExample, String> errors = Maps.newHashMapWithExpectedSize(result.size()); + for (Map.Entry<UdfExample, UdfExampleResult> entry : result.entrySet()) { + try { + EquivalenceLevel equivalence = compareResult(entry.getValue()); + comparisons.put(entry.getKey(), equivalence); + } catch (Exception e) { + errors.put(entry.getKey(), e.getMessage()); + } + } + return new ResultByExample.Partial(result, comparisons, errors); + } catch (ExecutionException e) { + if (e.getCause().getMessage().contains("Unsupported function")) { Review Comment: Relying on the exception message text is brittle. Consider catching a specific exception type or using an error code rather than string matching. ########## pinot-core/src/main/java/org/apache/pinot/core/util/SegmentProcessorAvroUtils.java: ########## @@ -86,42 +91,54 @@ public static Schema convertPinotSchemaToAvroSchema(org.apache.pinot.spi.data.Sc if (fieldSpec.isSingleValueField()) { switch (storedType) { case INT: - fieldAssembler = fieldAssembler.name(name).type().intType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().intType().noDefault(); break; case LONG: - fieldAssembler = fieldAssembler.name(name).type().longType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().longType().noDefault(); break; case FLOAT: - fieldAssembler = fieldAssembler.name(name).type().floatType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().floatType().noDefault(); break; case DOUBLE: - fieldAssembler = fieldAssembler.name(name).type().doubleType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().doubleType().noDefault(); break; case STRING: - fieldAssembler = fieldAssembler.name(name).type().stringType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().stringType().noDefault(); break; case BYTES: - fieldAssembler = fieldAssembler.name(name).type().bytesType().noDefault(); + fieldAssembler = fieldAssembler.name(name).type().nullable().bytesType().noDefault(); + break; + case BIG_DECIMAL: + fieldAssembler = fieldAssembler.name(name).type().nullable().stringBuilder() + .endString() + .noDefault(); Review Comment: The Avro schema builder call `stringBuilder()` is incorrect; it should be `stringType()` to define a string field. The current code may not generate a valid Avro schema for BigDecimal. ```suggestion fieldAssembler = fieldAssembler.name(name).type().nullable().stringType().noDefault(); ``` ########## pinot-udf-test/src/main/java/org/apache/pinot/udf/test/scenarios/AbstractUdfTestScenario.java: ########## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.udf.test.scenarios; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pinot.core.udf.Udf; +import org.apache.pinot.core.udf.UdfExample; +import org.apache.pinot.core.udf.UdfSignature; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.udf.test.PinotFunctionEnvGenerator; +import org.apache.pinot.udf.test.UdfExampleResult; +import org.apache.pinot.udf.test.UdfTestCluster; +import org.apache.pinot.udf.test.UdfTestScenario; + + +/// An abstract class for UDF test scenarios, providing common functionality like running SQL queries on a given +/// PinotFunctionTestCluster and extracting results from the query output. +public abstract class AbstractUdfTestScenario implements UdfTestScenario { + + protected final UdfTestCluster _cluster; + private final boolean _nullHandlingEnabled; + + public AbstractUdfTestScenario(UdfTestCluster cluster, boolean nullHandlingEnabled) { + _cluster = cluster; + _nullHandlingEnabled = nullHandlingEnabled; + } + + protected boolean isNullHandlingEnabled() { + return _nullHandlingEnabled; + } + + protected String replaceCommonVariables( + Udf udf, + UdfSignature signature, + boolean nullHandling, + /* language=sql*/ String templateSql) { + String call = udf.asSqlCall(udf.getMainFunctionName(), PinotFunctionEnvGenerator.getArgsForCall(signature)); + return templateSql + .replaceAll("@call", call) + .replaceAll("@table", PinotFunctionEnvGenerator.getTableName(udf)) + .replaceAll("@udfCol", PinotFunctionEnvGenerator.getUdfColumnName()) + .replaceAll("@udfName", udf.getMainFunctionName()) + .replaceAll("@testCol", PinotFunctionEnvGenerator.getTestColumnName()) + .replaceAll("@resultCol", PinotFunctionEnvGenerator.getResultColumnName(signature, nullHandling)) + // Important, we need to replace the @testCol and @result after replacing @test and @resultCol respectively + .replaceAll("@test", "test") + .replaceAll("@result", "result") + .replaceAll("@signatureCol", PinotFunctionEnvGenerator.getSignatureColumnName()) + .replaceAll("@signature", signature.toString()); Review Comment: Using `replaceAll` treats the first argument as a regex pattern, which may break if any variable contains regex meta-characters. Consider using `String.replace` for literal replacements. ```suggestion .replace("@call", call) .replace("@table", PinotFunctionEnvGenerator.getTableName(udf)) .replace("@udfCol", PinotFunctionEnvGenerator.getUdfColumnName()) .replace("@udfName", udf.getMainFunctionName()) .replace("@testCol", PinotFunctionEnvGenerator.getTestColumnName()) .replace("@resultCol", PinotFunctionEnvGenerator.getResultColumnName(signature, nullHandling)) // Important, we need to replace the @testCol and @result after replacing @test and @resultCol respectively .replace("@test", "test") .replace("@result", "result") .replace("@signatureCol", PinotFunctionEnvGenerator.getSignatureColumnName()) .replace("@signature", signature.toString()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org