This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0d69ae1157 Make StreamMessage generic (#9544) 0d69ae1157 is described below commit 0d69ae11570ef15fa663d750e7954838252a15ec Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Sun Oct 9 13:17:12 2022 -0700 Make StreamMessage generic (#9544) --- .../plugin/stream/kafka20/KafkaMessageBatch.java | 6 +++--- .../kafka20/KafkaPartitionLevelConsumer.java | 6 +++--- .../plugin/stream/kafka20/KafkaStreamMessage.java | 2 +- .../kafka20/KafkaPartitionLevelConsumerTest.java | 12 ++++++------ .../org/apache/pinot/spi/stream/MessageBatch.java | 4 ++-- .../pinot/spi/stream/StreamDataDecoderImpl.java | 4 ++-- .../org/apache/pinot/spi/stream/StreamMessage.java | 18 ++++++++++++------ .../spi/stream/StreamDataDecoderImplTest.java | 11 +++++++---- .../apache/pinot/spi/stream/StreamMessageTest.java | 22 +++++++++++++++++----- 9 files changed, 53 insertions(+), 32 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 4fd01562cc..1852c8bdc8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -27,8 +27,8 @@ import org.apache.pinot.spi.stream.StreamMessage; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -public class KafkaMessageBatch implements MessageBatch<StreamMessage> { - private final List<StreamMessage> _messageList; +public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> { + private final List<StreamMessage<byte[]>> _messageList; private final int _unfilteredMessageCount; private final long _lastOffset; @@ -37,7 +37,7 @@ public class KafkaMessageBatch implements MessageBatch<StreamMessage> { * @param lastOffset the offset of the last message in the batch * @param batch the messages, which may be smaller than {@see unfilteredMessageCount} */ - public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage> batch) { + public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch) { _messageList = batch; _lastOffset = lastOffset; _unfilteredMessageCount = unfilteredMessageCount; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 59ee1c0eab..e7d272041c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -46,14 +46,14 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa } @Override - public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset, + public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); return fetchMessages(startOffset, endOffset, timeoutMillis); } - public MessageBatch<StreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { + public MessageBatch<StreamMessage<byte[]>> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, endOffset, timeoutMillis); @@ -61,7 +61,7 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa _consumer.seek(_topicPartition, startOffset); ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); - List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size()); + List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size()); long lastOffset = startOffset; for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { String key = messageAndOffset.key(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java index 8124bea53d..f67dcf3f7d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java @@ -25,7 +25,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata; public class KafkaStreamMessage extends StreamMessage { public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) { - super(key, value, metadata); + super(key, value, metadata, value.length); } public long getNextOffset() { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java index 0a2f7d2f5f..6c85f913f5 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java @@ -281,7 +281,7 @@ public class KafkaPartitionLevelConsumerTest { consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch1.getMessageCount(), 500); for (int i = 0; i < batch1.getMessageCount(); i++) { - final byte[] msg = batch1.getStreamMessage(i).getValue(); + final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + i); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } @@ -290,7 +290,7 @@ public class KafkaPartitionLevelConsumerTest { consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch2.getMessageCount(), 500); for (int i = 0; i < batch2.getMessageCount(); i++) { - final byte[] msg = batch2.getStreamMessage(i).getValue(); + final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } @@ -298,7 +298,7 @@ public class KafkaPartitionLevelConsumerTest { final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000); Assert.assertEquals(batch3.getMessageCount(), 25); for (int i = 0; i < batch3.getMessageCount(); i++) { - final byte[] msg = batch3.getStreamMessage(i).getValue(); + final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } @@ -388,7 +388,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch1 = consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(400), 10000); Assert.assertEquals(batch1.getMessageCount(), 200); for (int i = 0; i < batch1.getMessageCount(); i++) { - byte[] msg = batch1.getStreamMessage(i).getValue(); + byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200)); } Assert.assertEquals(batch1.getOffsetOfNextBatch().toString(), "400"); @@ -400,7 +400,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(201), new LongMsgOffset(401), 10000); Assert.assertEquals(batch3.getMessageCount(), 200); for (int i = 0; i < batch3.getMessageCount(); i++) { - byte[] msg = batch3.getStreamMessage(i).getValue(); + byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 201)); } Assert.assertEquals(batch3.getOffsetOfNextBatch().toString(), "401"); @@ -408,7 +408,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch4 = consumer.fetchMessages(new LongMsgOffset(0), null, 10000); Assert.assertEquals(batch4.getMessageCount(), 500); for (int i = 0; i < batch4.getMessageCount(); i++) { - byte[] msg = batch4.getStreamMessage(i).getValue(); + byte[] msg = (byte[]) batch4.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200)); } Assert.assertEquals(batch4.getOffsetOfNextBatch().toString(), "700"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 0e80df0234..d22a46af67 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -55,13 +55,13 @@ public interface MessageBatch<T> { return (byte[]) getMessageAtIndex(index); } - default StreamMessage getStreamMessage(int index) { + default StreamMessage<T> getStreamMessage(int index) { return new LegacyStreamMessage(getMessageBytesAtIndex(index)); } class LegacyStreamMessage extends StreamMessage { public LegacyStreamMessage(byte[] value) { - super(value); + super(value, value.length); } } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java index 3e69dbca03..97958b92d3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -44,13 +44,13 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { try { _reuse.clear(); - GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, _reuse); + GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getLength(), _reuse); if (row != null) { if (message.getKey() != null) { row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8)); } RowMetadata metadata = message.getMetadata(); - if (metadata != null) { + if (metadata != null && metadata.getHeaders() != null) { metadata.getHeaders().getFieldToValueMap() .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value)); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java index 6eaf099c12..e626dc8106 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java @@ -38,25 +38,31 @@ import javax.annotation.Nullable; * Additionally, the pinot table schema should refer these fields. Otherwise, even though the fields are extracted, * they will not materialize in the pinot table. */ -public class StreamMessage { +public class StreamMessage<T> { private final byte[] _key; - private final byte[] _value; + private final T _value; protected final StreamMessageMetadata _metadata; + int _length = -1; - public StreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) { + public StreamMessage(@Nullable byte[] key, T value, @Nullable StreamMessageMetadata metadata, int length) { _key = key; _value = value; _metadata = metadata; + _length = length; } - public StreamMessage(byte[] value) { - this(null, value, null); + public StreamMessage(T value, int length) { + this(null, value, null, length); } - public byte[] getValue() { + public T getValue() { return _value; } + public int getLength() { + return _length; + } + @Nullable public StreamMessageMetadata getMetadata() { return _metadata; diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java index 0483b7d1ac..56bfb9b97b 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java @@ -40,7 +40,8 @@ public class StreamDataDecoderImplTest { TestDecoder messageDecoder = new TestDecoder(); messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); String value = "Alice"; - StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8)); + StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8).length); StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); Assert.assertNotNull(result); Assert.assertNull(result.getException()); @@ -62,8 +63,9 @@ public class StreamDataDecoderImplTest { headers.putValue(AGE_HEADER_KEY, 3); Map<String, String> recordMetadata = Collections.singletonMap(SEQNO_RECORD_METADATA, "1"); StreamMessageMetadata metadata = new StreamMessageMetadata(1234L, headers, recordMetadata); - StreamMessage message = new StreamMessage(key.getBytes(StandardCharsets.UTF_8), - value.getBytes(StandardCharsets.UTF_8), metadata); + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + StreamMessage<byte[]> message = + new StreamMessage(key.getBytes(StandardCharsets.UTF_8), valueBytes, metadata, value.length()); StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); Assert.assertNotNull(result); @@ -84,7 +86,8 @@ public class StreamDataDecoderImplTest { ThrowingDecoder messageDecoder = new ThrowingDecoder(); messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); String value = "Alice"; - StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8)); + StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8).length); StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); Assert.assertNotNull(result); Assert.assertNotNull(result.getException()); diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java index 926761109b..10e5087493 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java @@ -27,17 +27,29 @@ public class StreamMessageTest { @Test public void testAllowNullKeyAndMetadata() { - StreamMessage msg = new StreamMessage("hello".getBytes(StandardCharsets.UTF_8)); + String value = "hello"; + byte[] valBytes = value.getBytes(StandardCharsets.UTF_8); + StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length); Assert.assertNull(msg.getKey()); Assert.assertNull(msg.getMetadata()); - Assert.assertEquals(new String(msg.getValue()), "hello"); + Assert.assertEquals(new String(msg.getValue()), value); - StreamMessage msg1 = new StreamMessage("key".getBytes(StandardCharsets.UTF_8), - "value".getBytes(StandardCharsets.UTF_8), null); + value = "value"; + valBytes = value.getBytes(StandardCharsets.UTF_8); + StreamMessage<byte[]> msg1 = + new StreamMessage("key".getBytes(StandardCharsets.UTF_8), valBytes, null, valBytes.length); Assert.assertNotNull(msg1.getKey()); Assert.assertEquals(new String(msg1.getKey()), "key"); Assert.assertNotNull(msg1.getValue()); - Assert.assertEquals(new String(msg1.getValue()), "value"); + Assert.assertEquals(new String(msg1.getValue()), value); Assert.assertNull(msg1.getMetadata()); + + StreamMessage<String> msg2 = new StreamMessage<>("key".getBytes(StandardCharsets.UTF_8), value, null, + value.length()); + Assert.assertNotNull(msg2.getKey()); + Assert.assertEquals(new String(msg2.getKey()), "key"); + Assert.assertNotNull(msg2.getValue()); + Assert.assertEquals(msg2.getValue(), value); + Assert.assertNull(msg2.getMetadata()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org