This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f4129876f1 Add config to preserve stream topic name as a column for 
`CLPLogRecordExtractor` (#14599)
f4129876f1 is described below

commit f4129876f17a6dffba86b9a1e41972ac0e8e1cd2
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Fri Dec 13 17:25:18 2024 -0800

    Add config to preserve stream topic name as a column for 
`CLPLogRecordExtractor` (#14599)
    
    * Add config to preserve stream topic name in rows
    
    * fill row if extract all
---
 .../inputformat/clplog/CLPLogRecordExtractor.java   | 10 ++++++++++
 .../clplog/CLPLogRecordExtractorConfig.java         | 15 +++++++++++++++
 .../clplog/CLPLogRecordExtractorTest.java           | 21 ++++++++++++++++++++-
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index 2f2e730b12..d4be2e193f 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -97,6 +97,11 @@ public class CLPLogRecordExtractor extends 
BaseRecordExtractor<Map<String, Objec
     _serverMetrics = serverMetrics;
   }
 
+  public void init(Set<String> fields, @Nullable RecordExtractorConfig 
recordExtractorConfig, String topicName) {
+    init(fields, recordExtractorConfig);
+    _topicName = topicName;
+  }
+
   @Override
   public void init(Set<String> fields, @Nullable RecordExtractorConfig 
recordExtractorConfig) {
     _config = (CLPLogRecordExtractorConfig) recordExtractorConfig;
@@ -131,6 +136,11 @@ public class CLPLogRecordExtractor extends 
BaseRecordExtractor<Map<String, Objec
   public GenericRow extract(Map<String, Object> from, GenericRow to) {
     Set<String> clpEncodedFieldNames = _config.getFieldsForClpEncoding();
 
+    // Preserve topic name if configured, regardless of _extractAll
+    if (_config.getTopicNameDestinationColumn() != null) {
+      to.putValue(_config.getTopicNameDestinationColumn(), _topicName);
+    }
+
     if (_extractAll) {
       for (Map.Entry<String, Object> recordEntry : from.entrySet()) {
         String recordKey = recordEntry.getKey();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
index 3d08f42da5..c735a04f06 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
@@ -49,6 +49,8 @@ public class CLPLogRecordExtractorConfig implements 
RecordExtractorConfig {
   public static final String REMOVE_PROCESSED_FIELDS_CONFIG_KEY = 
"removeProcessedFields";
   public static final String UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY = 
"unencodableFieldSuffix";
   public static final String UNENCODABLE_FIELD_ERROR_CONFIG_KEY = 
"unencodableFieldError";
+  // Preserve the topic name as a column in each destination row. If null, the 
topic name will not be preserved.
+  public static final String TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY = 
"topicNameDestinationColumn";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CLPLogRecordExtractorConfig.class);
 
@@ -56,6 +58,7 @@ public class CLPLogRecordExtractorConfig implements 
RecordExtractorConfig {
   private String _unencodableFieldSuffix = null;
   private String _unencodableFieldError = null;
   private boolean _removeProcessedFields = false;
+  private String _topicNameDestinationColumn = null;
 
   @Override
   public void init(Map<String, String> props) {
@@ -64,6 +67,15 @@ public class CLPLogRecordExtractorConfig implements 
RecordExtractorConfig {
       return;
     }
 
+    String topicNameDestinationColumn = 
props.get(TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
+    if (null != topicNameDestinationColumn) {
+      if (topicNameDestinationColumn.length() == 0) {
+        LOGGER.warn("Ignoring empty value for {}", 
TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
+      } else {
+        _topicNameDestinationColumn = topicNameDestinationColumn;
+      }
+    }
+
     String concatenatedFieldNames = 
props.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
     if (null == concatenatedFieldNames) {
       return;
@@ -114,4 +126,7 @@ public class CLPLogRecordExtractorConfig implements 
RecordExtractorConfig {
   public String getUnencodableFieldError() {
     return _unencodableFieldError;
   }
+  public String getTopicNameDestinationColumn() {
+    return _topicNameDestinationColumn;
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 2e002cdb0c..1aeb5a3fd9 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -52,6 +52,8 @@ public class CLPLogRecordExtractorTest {
   private static final String _MESSAGE_2_FIELD_NAME = "message2";
   private static final String _MESSAGE_2_FIELD_VALUE = "Stopped job_123 on 
node-987: 3 cores, 6 threads and "
       + "22.0% memory used.";
+  private static final String _TOPIC_NAME = "test-topic";
+  private static final String _TOPIC_NAME_DEST_COLUMN = "_streamTopicName";
 
   @Test
   public void testCLPEncoding() {
@@ -147,6 +149,23 @@ public class CLPLogRecordExtractorTest {
     assertEquals(row.getValue(_MESSAGE_2_FIELD_NAME), _MESSAGE_2_FIELD_VALUE);
   }
 
+  @Test
+  public void testPreserveTopicName() {
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = new HashSet<>();
+    fieldsToRead.add(_MESSAGE_1_FIELD_NAME);
+
+    // Test null topicNameDestinationColumn config
+    GenericRow row;
+    row = extract(props, fieldsToRead);
+    assertNull(row.getValue(_TOPIC_NAME_DEST_COLUMN));
+
+    // Test with valid topicNameDestinationColumn config
+    
props.put(CLPLogRecordExtractorConfig.TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY, 
_TOPIC_NAME_DEST_COLUMN);
+    row = extract(props, fieldsToRead);
+    assertEquals(row.getValue(_TOPIC_NAME_DEST_COLUMN), _TOPIC_NAME);
+  }
+
   private void addCLPEncodedField(String fieldName, Set<String> fields) {
     fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
     fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
@@ -157,7 +176,7 @@ public class CLPLogRecordExtractorTest {
     CLPLogRecordExtractorConfig extractorConfig = new 
CLPLogRecordExtractorConfig();
     CLPLogRecordExtractor extractor = new CLPLogRecordExtractor();
     extractorConfig.init(props);
-    extractor.init(fieldsToRead, extractorConfig);
+    extractor.init(fieldsToRead, extractorConfig, _TOPIC_NAME);
 
     // Assemble record
     Map<String, Object> record = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to