This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch message-header in repository https://gitbox.apache.org/repos/asf/pinot.git
commit f89cc3b53c52b2481575f92c1245aab844e6ea22 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sat Jul 23 11:47:30 2022 -0700 Adding support to extract values from message header.. initial support for kafka headers --- .../realtime/LLRealtimeSegmentDataManager.java | 8 +++++++- .../kafka20/KafkaPartitionLevelConsumer.java | 13 +++++++++++- .../stream/kafka20/RowMetadataExtractor.java | 23 +++++++++++++++++++++- .../org/apache/pinot/spi/stream/RowMetadata.java | 6 ++++++ .../pinot/spi/stream/StreamMessageMetadata.java | 14 +++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index fdfe31377e..1de9c818ba 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -527,8 +527,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index); GenericRow decodedRow = _messageDecoder - .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index), + .decode(messagesAndOffsets.getMessageAtIndex(index), + messagesAndOffsets.getMessageOffsetAtIndex(index), messagesAndOffsets.getMessageLengthAtIndex(index), reuse); + if (msgMetadata.getHeaders() != null) { + for (Map.Entry<String, Object> entrySet : msgMetadata.getHeaders().getFieldToValueMap().entrySet()) { + decodedRow.putValue(entrySet.getKey(), entrySet.getValue()); + } + } if (decodedRow != null) { try { _transformPipeline.processRow(decodedRow, reusedResult); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index bf212cd855..31292cc09c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -23,11 +23,16 @@ import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamMessageMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +43,8 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class); + GenericRow _headers = new GenericRow(); + public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); } @@ -65,8 +72,12 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa long offset = messageAndOffset.offset(); if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { if (message != null) { + RowMetadata rowMetadata = null; + if (_config.isPopulateMetadata()) { + rowMetadata = _rowMetadataExtractor.extract(messageAndOffset); + } filtered.add( - new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset))); + new MessageAndOffsetAndMetadata(message.get(), offset, rowMetadata)); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug("tombstone message at offset {}", offset); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java index 81402bb7d5..e36bb331f4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java @@ -19,6 +19,9 @@ package org.apache.pinot.plugin.stream.kafka20; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamMessageMetadata; @@ -26,7 +29,25 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata; @FunctionalInterface public interface RowMetadataExtractor { static RowMetadataExtractor build(boolean populateMetadata) { - return populateMetadata ? record -> new StreamMessageMetadata(record.timestamp()) : record -> null; + return record -> { + if (populateMetadata) { + return null; + } else { + StreamMessageMetadata streamMessageMetadata = new StreamMessageMetadata(record.timestamp()); + Headers headers = record.headers(); + if (headers != null) { + GenericRow headerGenericRow = new GenericRow(); + if (headers != null) { + Header[] headersArray = headers.toArray(); + for (Header header : headersArray) { + headerGenericRow.putValue(header.key(), header.value()); + } + } + streamMessageMetadata.setHeaders(headerGenericRow); + } + return streamMessageMetadata; + } + }; } RowMetadata extract(ConsumerRecord<?, ?> consumerRecord); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java index a8b0f2116f..fb19701d02 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.spi.data.readers.GenericRow; /** @@ -40,4 +41,9 @@ public interface RowMetadata { * Long.MIN_VALUE if not available */ long getIngestionTimeMs(); + + default GenericRow getHeaders(){ + return null; + } + } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java index 9991f34eac..d32deda9df 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; +import org.apache.pinot.spi.data.readers.GenericRow; + + /** * A class that provides metadata associated with the message of a stream, for e.g., * ingestion-timestamp of the message. @@ -26,6 +29,8 @@ public class StreamMessageMetadata implements RowMetadata { private final long _ingestionTimeMs; + GenericRow _headers; + /** * Construct the stream based message/row message metadata * @@ -40,4 +45,13 @@ public class StreamMessageMetadata implements RowMetadata { public long getIngestionTimeMs() { return _ingestionTimeMs; } + + @Override + public GenericRow getHeaders() { + return _headers; + } + + public void setHeaders(GenericRow headers) { + _headers = headers; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org