This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch message-header
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit f89cc3b53c52b2481575f92c1245aab844e6ea22
Author: kishoreg <g.kish...@gmail.com>
AuthorDate: Sat Jul 23 11:47:30 2022 -0700

    Adding support to extract values from message header.. initial support for 
kafka headers
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  8 +++++++-
 .../kafka20/KafkaPartitionLevelConsumer.java       | 13 +++++++++++-
 .../stream/kafka20/RowMetadataExtractor.java       | 23 +++++++++++++++++++++-
 .../org/apache/pinot/spi/stream/RowMetadata.java   |  6 ++++++
 .../pinot/spi/stream/StreamMessageMetadata.java    | 14 +++++++++++++
 5 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fdfe31377e..1de9c818ba 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -527,8 +527,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
 
       GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), 
messagesAndOffsets.getMessageOffsetAtIndex(index),
+          .decode(messagesAndOffsets.getMessageAtIndex(index),
+              messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
+      if (msgMetadata.getHeaders() != null) {
+        for (Map.Entry<String, Object> entrySet : 
msgMetadata.getHeaders().getFieldToValueMap().entrySet()) {
+          decodedRow.putValue(entrySet.getKey(), entrySet.getValue());
+        }
+      }
       if (decodedRow != null) {
         try {
           _transformPipeline.processRow(decodedRow, reusedResult);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index bf212cd855..31292cc09c 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -23,11 +23,16 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +43,8 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
+  GenericRow _headers = new GenericRow();
+
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
@@ -65,8 +72,12 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
       long offset = messageAndOffset.offset();
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
+          RowMetadata rowMetadata = null;
+          if (_config.isPopulateMetadata()) {
+            rowMetadata = _rowMetadataExtractor.extract(messageAndOffset);
+          }
           filtered.add(
-              new MessageAndOffsetAndMetadata(message.get(), offset, 
_rowMetadataExtractor.extract(messageAndOffset)));
+              new MessageAndOffsetAndMetadata(message.get(), offset, 
rowMetadata));
         } else if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("tombstone message at offset {}", offset);
         }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
index 81402bb7d5..e36bb331f4 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
@@ -19,6 +19,9 @@
 package org.apache.pinot.plugin.stream.kafka20;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 
@@ -26,7 +29,25 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
 @FunctionalInterface
 public interface RowMetadataExtractor {
   static RowMetadataExtractor build(boolean populateMetadata) {
-    return populateMetadata ? record -> new 
StreamMessageMetadata(record.timestamp()) : record -> null;
+    return record -> {
+      if (populateMetadata) {
+        return null;
+      } else {
+        StreamMessageMetadata streamMessageMetadata = new 
StreamMessageMetadata(record.timestamp());
+        Headers headers = record.headers();
+        if (headers != null) {
+          GenericRow headerGenericRow = new GenericRow();
+          if (headers != null) {
+            Header[] headersArray = headers.toArray();
+            for (Header header : headersArray) {
+              headerGenericRow.putValue(header.key(), header.value());
+            }
+          }
+          streamMessageMetadata.setHeaders(headerGenericRow);
+        }
+        return streamMessageMetadata;
+      }
+    };
   }
 
   RowMetadata extract(ConsumerRecord<?, ?> consumerRecord);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index a8b0f2116f..fb19701d02 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
@@ -40,4 +41,9 @@ public interface RowMetadata {
    *         Long.MIN_VALUE if not available
    */
   long getIngestionTimeMs();
+
+  default GenericRow getHeaders(){
+    return null;
+  }
+
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 9991f34eac..d32deda9df 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.spi.stream;
 
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
 /**
  * A class that provides metadata associated with the message of a stream, for 
e.g.,
  * ingestion-timestamp of the message.
@@ -26,6 +29,8 @@ public class StreamMessageMetadata implements RowMetadata {
 
   private final long _ingestionTimeMs;
 
+  GenericRow _headers;
+
   /**
    * Construct the stream based message/row message metadata
    *
@@ -40,4 +45,13 @@ public class StreamMessageMetadata implements RowMetadata {
   public long getIngestionTimeMs() {
     return _ingestionTimeMs;
   }
+
+  @Override
+  public GenericRow getHeaders() {
+    return _headers;
+  }
+
+  public void setHeaders(GenericRow headers) {
+    _headers = headers;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to