ege-st commented on code in PR #12243:
URL: https://github.com/apache/pinot/pull/12243#discussion_r1488632692


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/RecordEnricherPipeline.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class RecordEnricherPipeline {
+  private final List<RecordEnricher> _enrichers = new ArrayList<>();
+  private final Set<String> _columnsToExtract = new HashSet<>();
+
+  public static RecordEnricherPipeline getPassThroughPipeline() {
+    return new RecordEnricherPipeline();
+  }
+
+  public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig 
ingestionConfig) {
+    RecordEnricherPipeline pipeline = new RecordEnricherPipeline();
+    if (null == ingestionConfig || null == 
ingestionConfig.getEnrichmentConfigs()) {
+      return pipeline;
+    }
+    List<EnrichmentConfig> enrichmentConfigs = 
ingestionConfig.getEnrichmentConfigs();
+    for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+      try {
+        RecordEnricher enricher = (RecordEnricher) 
Class.forName(enrichmentConfig.getEnricherClassName()).newInstance();
+        enricher.init(enrichmentConfig.getProperties());
+        pipeline.add(enricher);
+      } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+        throw new RuntimeException("Failed to instantiate record enricher" + 
enrichmentConfig.getEnricherClassName(),

Review Comment:
   ```suggestion
           throw new RuntimeException("Failed to instantiate record enricher: " 
+ enrichmentConfig.getEnricherClassName(),
   ```
   I believe as is the class name will be concatenated with the word `enricher` 
which will make it hard to read these error messages.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + 
FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = 
List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);
+    }
+  }
+
+  private void enrichWithClpEncodedFields(String key, Object value, GenericRow 
to) {
+    String logtype = null;
+    Object[] dictVars = null;
+    Object[] encodedVars = null;
+    if (null != value) {
+      if (!(value instanceof String)) {
+        LOGGER.error("Can't encode value of type {} with CLP. name: '{}', 
value: '{}'",
+            value.getClass().getSimpleName(), key, value);
+      } else {
+        String valueAsString = (String) value;
+        try {
+          _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+          logtype = _clpEncodedMessage.getLogTypeAsString();
+          encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+          dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        } catch (IOException e) {
+          LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', 
error: {}", key, valueAsString,
+              e.getMessage());
+        }

Review Comment:
   I think this would be more readable with the success path as the first 
branch:
   ```suggestion
         if (value instanceof String) {
           try {
             _clpMessageEncoder.encodeMessage(valueAsString, 
_clpEncodedMessage);
             logtype = _clpEncodedMessage.getLogTypeAsString();
             encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
             dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
           } catch (IOException e) {
             LOGGER.error("Can't encode field with CLP. name: '{}', value: 
'{}', error: {}", key, valueAsString,
                 e.getMessage());
           }
         } else {
           LOGGER.error("Can't encode value of type {} with CLP. name: '{}', 
value: '{}'",
               value.getClass().getSimpleName(), key, value);
           String valueAsString = (String) value;
   
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;
+
+  @Override
+  public void init(Map<String, String> enricherProps) {
+    _fieldToFunctionEvaluator = new HashMap<>();
+    _fieldsToExtract = new ArrayList<>();
+    enricherProps.forEach((k, v) -> {
+      _fieldToFunctionEvaluator.put(k, 
FunctionEvaluatorFactory.getExpressionEvaluator(v));
+      _fieldsToExtract.addAll(_fieldToFunctionEvaluator.get(k).getArguments());

Review Comment:
   Why not use an intermediate?
   ```suggestion
         FunctionEvaluator exprEval = 
FunctionEvaluatorFactory.getExpressionEvaluator(v)
         _fieldToFunctionEvaluator.put(k, exprEval);
         _fieldsToExtract.addAll(exprEval.getArguments());
   ```
   
   This way you avoid having the HashMap lookup on a value that you have 
created within this scope.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;

Review Comment:
   I think these can be made `final` thus ensuring they are only instantiated 
once during ctor and never reassigned (small thing but enforcing expected 
behavior with the compiler is a good habit to have).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -157,17 +159,20 @@ public RecordReader getRecordReader() {
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
     SegmentCreationDataSource dataSource = new 
RecordReaderSegmentCreationDataSource(recordReader);
-    init(config, dataSource, new TransformPipeline(config.getTableConfig(), 
config.getSchema()));
+    init(config, dataSource, 
RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+        new TransformPipeline(config.getTableConfig(), config.getSchema()));

Review Comment:
   Does the `RecordEnricherPipeline` evaluate before the `TransformPipeline` or 
after? If it's after then we should have it occur after in the ctor parameter 
list, so that the APIs help re-enforce the logical behavior.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);

Review Comment:
   What happens if one of the fields contains an illegal character (e.g., 
`fo"o,ba$r`)? Are there illegal characters for field names? Or we have a field 
list like: `foo,,,bar,,`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;

Review Comment:
   I think both of these can be made `final`, which will help enforce that they 
should only be assigned an instance once each (when the ctor is called).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + 
FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = 
List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);

Review Comment:
   One subtle consequence of this pattern we use of modifying the input 
`GenericRow` is that if we have a fault during the modification then we're left 
with a partially updated `GenericRow`. Hopefully, we always drop the write when 
a pipeline fails otherwise, we could get very difficult to debug 'broken' data 
written to a segment.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -178,10 +183,12 @@ public void init(SegmentGeneratorConfig config, 
SegmentCreationDataSource dataSo
     if (config.isFailOnEmptySegment()) {
       Preconditions.checkState(_recordReader.hasNext(), "No record in data 
source");
     }
+    _recordEnricherPipeline = enricherPipeline;
     _transformPipeline = transformPipeline;
     // Use the same transform pipeline if the data source is backed by a 
record reader
     if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
       ((RecordReaderSegmentCreationDataSource) 
dataSource).setTransformPipeline(transformPipeline);
+      ((RecordReaderSegmentCreationDataSource) 
dataSource).setRecordEnricherPipeline(enricherPipeline);

Review Comment:
   Likewise, if `RecordEnricherPipeline` is executed before `TransformPipeline` 
then lets have it consistently occur before `TransformPipeline`.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + 
FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = 
List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }

Review Comment:
   It's usually more readable to have the success path be the first branch and 
the failure path be the else branch.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class EnrichmentConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Enricher class name")
+  private final String _enricherClassName;
+
+  @JsonPropertyDescription("Enricher properties")
+  private final Map<String, String> _properties;

Review Comment:
   What are some examples of properties we might have?  The only one I saw in 
the code was `fieldsForClpEncoding` for `CLPEncodingEnricher`. Are there 
others? 
   
   One concern I have is how hard it will be to determine what valid properties 
are and what required properties are from the Config Schema.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + 
FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = 
List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);
+    }
+  }
+
+  private void enrichWithClpEncodedFields(String key, Object value, GenericRow 
to) {
+    String logtype = null;
+    Object[] dictVars = null;
+    Object[] encodedVars = null;
+    if (null != value) {
+      if (!(value instanceof String)) {
+        LOGGER.error("Can't encode value of type {} with CLP. name: '{}', 
value: '{}'",
+            value.getClass().getSimpleName(), key, value);
+      } else {
+        String valueAsString = (String) value;
+        try {
+          _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+          logtype = _clpEncodedMessage.getLogTypeAsString();
+          encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+          dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        } catch (IOException e) {
+          LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', 
error: {}", key, valueAsString,
+              e.getMessage());
+        }
+      }
+    }
+
+    to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+    to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+    to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);

Review Comment:
   These will be added as columns within the table?  What happens if there's a 
collision (e.g., a user gives a schema name that is the same as one of these 
dynamically constructed names)? Is that possible?  If not, let me know why so I 
understand Pinot better.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;
+
+  @Override
+  public void init(Map<String, String> enricherProps) {
+    _fieldToFunctionEvaluator = new HashMap<>();
+    _fieldsToExtract = new ArrayList<>();
+    enricherProps.forEach((k, v) -> {
+      _fieldToFunctionEvaluator.put(k, 
FunctionEvaluatorFactory.getExpressionEvaluator(v));
+      _fieldsToExtract.addAll(_fieldToFunctionEvaluator.get(k).getArguments());
+    });
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fieldsToExtract;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    _fieldToFunctionEvaluator.forEach((field, evaluator) -> {
+      record.putValue(field, evaluator.evaluate(record));
+    });

Review Comment:
   Is is possible for an exception to get thrown by any of the `evaluator`s? If 
so, should we catch here (as was done in CLP) and log a message before 
propagating up?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = 
"fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = 
enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + 
FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = 
List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);

Review Comment:
   Obviously, outside the scope of this PR, but just occurred to me and wanted 
to write it down to get feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to