This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f4129876f1 Add config to preserve stream topic name as a column for `CLPLogRecordExtractor` (#14599) f4129876f1 is described below commit f4129876f17a6dffba86b9a1e41972ac0e8e1cd2 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Fri Dec 13 17:25:18 2024 -0800 Add config to preserve stream topic name as a column for `CLPLogRecordExtractor` (#14599) * Add config to preserve stream topic name in rows * fill row if extract all --- .../inputformat/clplog/CLPLogRecordExtractor.java | 10 ++++++++++ .../clplog/CLPLogRecordExtractorConfig.java | 15 +++++++++++++++ .../clplog/CLPLogRecordExtractorTest.java | 21 ++++++++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java index 2f2e730b12..d4be2e193f 100644 --- a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java @@ -97,6 +97,11 @@ public class CLPLogRecordExtractor extends BaseRecordExtractor<Map<String, Objec _serverMetrics = serverMetrics; } + public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig, String topicName) { + init(fields, recordExtractorConfig); + _topicName = topicName; + } + @Override public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) { _config = (CLPLogRecordExtractorConfig) recordExtractorConfig; @@ -131,6 +136,11 @@ public class CLPLogRecordExtractor extends BaseRecordExtractor<Map<String, Objec public GenericRow extract(Map<String, Object> from, GenericRow to) { Set<String> clpEncodedFieldNames = _config.getFieldsForClpEncoding(); + // Preserve topic name if configured, regardless of _extractAll + if (_config.getTopicNameDestinationColumn() != null) { + to.putValue(_config.getTopicNameDestinationColumn(), _topicName); + } + if (_extractAll) { for (Map.Entry<String, Object> recordEntry : from.entrySet()) { String recordKey = recordEntry.getKey(); diff --git a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java index 3d08f42da5..c735a04f06 100644 --- a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java +++ b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java @@ -49,6 +49,8 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig { public static final String REMOVE_PROCESSED_FIELDS_CONFIG_KEY = "removeProcessedFields"; public static final String UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY = "unencodableFieldSuffix"; public static final String UNENCODABLE_FIELD_ERROR_CONFIG_KEY = "unencodableFieldError"; + // Preserve the topic name as a column in each destination row. If null, the topic name will not be preserved. + public static final String TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY = "topicNameDestinationColumn"; private static final Logger LOGGER = LoggerFactory.getLogger(CLPLogRecordExtractorConfig.class); @@ -56,6 +58,7 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig { private String _unencodableFieldSuffix = null; private String _unencodableFieldError = null; private boolean _removeProcessedFields = false; + private String _topicNameDestinationColumn = null; @Override public void init(Map<String, String> props) { @@ -64,6 +67,15 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig { return; } + String topicNameDestinationColumn = props.get(TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY); + if (null != topicNameDestinationColumn) { + if (topicNameDestinationColumn.length() == 0) { + LOGGER.warn("Ignoring empty value for {}", TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY); + } else { + _topicNameDestinationColumn = topicNameDestinationColumn; + } + } + String concatenatedFieldNames = props.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY); if (null == concatenatedFieldNames) { return; @@ -114,4 +126,7 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig { public String getUnencodableFieldError() { return _unencodableFieldError; } + public String getTopicNameDestinationColumn() { + return _topicNameDestinationColumn; + } } diff --git a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java index 2e002cdb0c..1aeb5a3fd9 100644 --- a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java +++ b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java @@ -52,6 +52,8 @@ public class CLPLogRecordExtractorTest { private static final String _MESSAGE_2_FIELD_NAME = "message2"; private static final String _MESSAGE_2_FIELD_VALUE = "Stopped job_123 on node-987: 3 cores, 6 threads and " + "22.0% memory used."; + private static final String _TOPIC_NAME = "test-topic"; + private static final String _TOPIC_NAME_DEST_COLUMN = "_streamTopicName"; @Test public void testCLPEncoding() { @@ -147,6 +149,23 @@ public class CLPLogRecordExtractorTest { assertEquals(row.getValue(_MESSAGE_2_FIELD_NAME), _MESSAGE_2_FIELD_VALUE); } + @Test + public void testPreserveTopicName() { + Map<String, String> props = new HashMap<>(); + Set<String> fieldsToRead = new HashSet<>(); + fieldsToRead.add(_MESSAGE_1_FIELD_NAME); + + // Test null topicNameDestinationColumn config + GenericRow row; + row = extract(props, fieldsToRead); + assertNull(row.getValue(_TOPIC_NAME_DEST_COLUMN)); + + // Test with valid topicNameDestinationColumn config + props.put(CLPLogRecordExtractorConfig.TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY, _TOPIC_NAME_DEST_COLUMN); + row = extract(props, fieldsToRead); + assertEquals(row.getValue(_TOPIC_NAME_DEST_COLUMN), _TOPIC_NAME); + } + private void addCLPEncodedField(String fieldName, Set<String> fields) { fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX); fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX); @@ -157,7 +176,7 @@ public class CLPLogRecordExtractorTest { CLPLogRecordExtractorConfig extractorConfig = new CLPLogRecordExtractorConfig(); CLPLogRecordExtractor extractor = new CLPLogRecordExtractor(); extractorConfig.init(props); - extractor.init(fieldsToRead, extractorConfig); + extractor.init(fieldsToRead, extractorConfig, _TOPIC_NAME); // Assemble record Map<String, Object> record = new HashMap<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org