Jackie-Jiang commented on a change in pull request #7820: URL: https://github.com/apache/pinot/pull/7820#discussion_r762322462
########## File path: pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java ########## @@ -559,12 +744,21 @@ void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) { } } + void readStringValuesMV(TransformEvaluator evaluator, int[] docIds, int length, String[][] valuesBuffer) { + evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), + valuesBuffer); + } + public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) { for (int i = 0; i < length; i++) { - numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); + numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], getSVDictIdsBuffer(), getReaderContext()); Review comment: This should not be changed ########## File path: pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java ########## @@ -0,0 +1,588 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.evaluators; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; +import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.JsonUtils; + + +public final class DefaultJsonPathEvaluator implements JsonPathEvaluator { + + private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using( + new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider()) + .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build()); + + private static final int[] EMPTY_INTS = new int[0]; + private static final long[] EMPTY_LONGS = new long[0]; + private static final float[] EMPTY_FLOATS = new float[0]; + private static final double[] EMPTY_DOUBLES = new double[0]; + private static final String[] EMPTY_STRINGS = new String[0]; + + public static JsonPathEvaluator create(String jsonPath, Object defaultValue) { Review comment: (minor) Let's annotate `defaultValue` as `nullable` so that IDE can warn on potential null value access. Same for other methods with it as argument ########## File path: pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java ########## @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.evaluator.json; + +import org.apache.pinot.segment.spi.evaluator.TransformEvaluator; + +/** + * Introduce an empty interface to allow it to be extended without + * affecting {@see TransformEvaluator}. + * + * This is an evolving SPI and subject to change. + */ +public interface JsonPathEvaluator extends TransformEvaluator { Review comment: These interfaces do not belong to the `segment-spi`, but `query-spi` which is not available yet. For now we can put them into the `pinot-core` which contains all the query related interfaces ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java ########## @@ -69,4 +70,114 @@ public BlockDocIdValueSet getBlockDocIdValueSet() { public BlockMetadata getMetadata() { throw new UnsupportedOperationException(); } + + /** + * Pushes a {@see TransformEvaluator} which will produce an int value down + * to be evaluated against the column. This is an unstable API. + * @param column column to evaluate against + * @param evaluator the evaluator which produces values from the storage in the column + * @param buffer the buffer to write outputs into + */ + public void pushDown(String column, TransformEvaluator evaluator, int[] buffer) { Review comment: Suggest renaming it to `fillValues` which is more general, and can be used in the future for all value reads ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java ########## @@ -94,6 +95,7 @@ public void init(List<TransformFunction> arguments, Map<String, DataSource> data _defaultValue = dataType.convert(((LiteralTransformFunction) arguments.get(3)).getLiteral()); } _resultMetadata = new TransformResultMetadata(dataType, isSingleValue, false); + _jsonPathEvaluator = JsonPathEvaluators.create(_jsonPathString, _defaultValue); Review comment: (optional) Consider directly passing the `JsonPath` into the evaluator so that we don't need to compile the json path in 2 places ########## File path: pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java ########## @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.evaluator.json; + +import com.google.common.base.Preconditions; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Allows registration of a custom {@see JsonPathEvaluator} which can handle custom storage + * functionality also present in a plugin. A default evaluator which can handle all default + * storage types will be provided to delegate to when standard storage types are encountered. + * + * This is an evolving SPI and subject to change. + */ +public final class JsonPathEvaluators { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathEvaluators.class); + + private static final AtomicReferenceFieldUpdater<JsonPathEvaluators, JsonPathEvaluatorProvider> UPDATER = + AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class, JsonPathEvaluatorProvider.class, "_provider"); + private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators(); + private static final DefaultProvider DEFAULT_PROVIDER = new DefaultProvider(); + private volatile JsonPathEvaluatorProvider _provider; + + /** + * Registration point to override how JSON paths are evaluated. This should be used + * when a Pinot plugin has special storage capabilities. For instance, imagine a + * plugin with a raw forward index which stores JSON in a binary format which + * pinot-core is unaware of and cannot evaluate JSON paths against (pinot-core only + * understands true JSON documents). Whenever JSON paths are evaluated against this + * custom storage, different storage access operations may be required, and the provided + * {@see JsonPathEvaluator} can inspect the provided {@see ForwardIndexReader} to + * determine whether it is the custom implementation and evaluate the JSON path against + * the binary JSON managed by the custom reader. If it is not the custom implementation, + * then the evaluation should be delegated to the provided delegate. + * + * This prevents the interface {@see ForwardIndexReader} from needing to be able to model + * any plugin storage format, which creates flexibility for the kinds of data structure + * plugins can employ. + * + * @param provider provides {@see JsonPathEvaluator} + * @return true if registration is successful, false otherwise + */ + public static boolean registerProvider(JsonPathEvaluatorProvider provider) { + Preconditions.checkArgument(provider != null, ""); + if (!UPDATER.compareAndSet(INSTANCE, null, provider)) { + LOGGER.warn("failed to register {} - {} already registered", provider, INSTANCE._provider); + return false; + } + return true; + } + + /** + * pinot-core must construct {@see JsonPathEvaluator} via this method to ensure it uses + * the registered implementation. Using the registered implementation allows pinot-core + * to evaluate JSON paths against data structures it doesn't understand or model. + * @param jsonPath the JSON path + * @param defaultValue the default value + * @return a JSON path evaluator which must understand all possible storage representations of JSON. + */ + public static JsonPathEvaluator create(String jsonPath, Object defaultValue) { + // plugins compose and delegate to the default implementation. + JsonPathEvaluator defaultEvaluator = DEFAULT_PROVIDER.create(jsonPath, defaultValue); + return Holder.PROVIDER.create(defaultEvaluator, jsonPath, defaultValue); + } + + /** + * Storing the registered evaluator in this holder and initialising it during + * the class load gives the best of both worlds: plugins have until the first + * JSON path evaluation to register an evaluator via + * {@see JsonPathEvaluators#registerProvider}, but once this class is loaded, + * the provider is constant and calls may be optimise aggressively by the JVM + * in ways which are impossible with a volatile reference. + */ + private static final class Holder { + static final JsonPathEvaluatorProvider PROVIDER; + + static { + JsonPathEvaluatorProvider provider = JsonPathEvaluators.INSTANCE._provider; + if (provider == null) { + provider = DEFAULT_PROVIDER; + if (!UPDATER.compareAndSet(INSTANCE, null, provider)) { + provider = JsonPathEvaluators.INSTANCE._provider; + } + } + PROVIDER = provider; + } + } + + private static class DefaultProvider implements JsonPathEvaluatorProvider { + + // default implementation uses MethodHandles to avoid pulling lots of implementation details into the SPI layer + + private static final MethodHandle FACTORY; + + static { + String className = "org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator"; + MethodHandle factory = null; + try { + Class<?> clazz = Class.forName(className, false, JsonPathEvaluators.class.getClassLoader()); + factory = MethodHandles.publicLookup() + .findStatic(clazz, "create", MethodType.methodType(JsonPathEvaluator.class, String.class, Object.class)); + } catch (Throwable implausible) { + LOGGER.error("could not construct MethodHandle for {}", className, + implausible); + } + FACTORY = factory; + } + + public JsonPathEvaluator create(String jsonPath, Object defaultValue) { + return create(null, jsonPath, defaultValue); + } + + @Override + public JsonPathEvaluator create(JsonPathEvaluator delegate, String jsonPath, Object defaultValue) { Review comment: Don't fully understand the logic here. What is the delegate evaluator used for? ########## File path: pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java ########## @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.evaluator.json; + +import com.google.common.base.Preconditions; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Allows registration of a custom {@see JsonPathEvaluator} which can handle custom storage + * functionality also present in a plugin. A default evaluator which can handle all default + * storage types will be provided to delegate to when standard storage types are encountered. + * + * This is an evolving SPI and subject to change. + */ +public final class JsonPathEvaluators { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathEvaluators.class); + + private static final AtomicReferenceFieldUpdater<JsonPathEvaluators, JsonPathEvaluatorProvider> UPDATER = + AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class, JsonPathEvaluatorProvider.class, "_provider"); + private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators(); + private static final DefaultProvider DEFAULT_PROVIDER = new DefaultProvider(); + private volatile JsonPathEvaluatorProvider _provider; + + /** + * Registration point to override how JSON paths are evaluated. This should be used + * when a Pinot plugin has special storage capabilities. For instance, imagine a + * plugin with a raw forward index which stores JSON in a binary format which + * pinot-core is unaware of and cannot evaluate JSON paths against (pinot-core only + * understands true JSON documents). Whenever JSON paths are evaluated against this + * custom storage, different storage access operations may be required, and the provided + * {@see JsonPathEvaluator} can inspect the provided {@see ForwardIndexReader} to + * determine whether it is the custom implementation and evaluate the JSON path against + * the binary JSON managed by the custom reader. If it is not the custom implementation, + * then the evaluation should be delegated to the provided delegate. + * + * This prevents the interface {@see ForwardIndexReader} from needing to be able to model + * any plugin storage format, which creates flexibility for the kinds of data structure + * plugins can employ. + * + * @param provider provides {@see JsonPathEvaluator} + * @return true if registration is successful, false otherwise + */ + public static boolean registerProvider(JsonPathEvaluatorProvider provider) { + Preconditions.checkArgument(provider != null, ""); Review comment: Does it work if we simply put the `provider` as a `private static` field and set it during the instance startup? Several classes handle the plugin that way ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.segment.spi.evaluator.TransformEvaluator; + + +public interface PushDownTransformFunction { Review comment: (optional) Should we make this extend the `TransformFunction` interface, or do you see this independent of the regular `TransformFunction` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org