This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9374964ad4 Message batch ingestion lag fix (#10983) 9374964ad4 is described below commit 9374964ad4b5797329333157fcbb1de56cc66657 Author: summerhasama-stripe <134637018+summerhasama-str...@users.noreply.github.com> AuthorDate: Thu Jun 29 12:12:42 2023 -0400 Message batch ingestion lag fix (#10983) * add back stream message constructur with 1 arg * fix bug with metadata being set to null in code trying to assure backwards incompatability * Assert not null on stream message metadata --------- Co-authored-by: Priyen Patel <pri...@stripe.com> Co-authored-by: Rong Rong <ro...@apache.org> --- .../stream/kafka20/KafkaPartitionLevelConsumerTest.java | 13 ++++++++++--- .../main/java/org/apache/pinot/spi/stream/MessageBatch.java | 6 +++--- .../java/org/apache/pinot/spi/stream/StreamMessage.java | 6 +++++- .../org/apache/pinot/spi/stream/StreamMessageMetadata.java | 4 ++++ .../apache/pinot/spi/stream/StreamDataDecoderImplTest.java | 4 ++-- .../java/org/apache/pinot/spi/stream/StreamMessageTest.java | 2 +- 6 files changed, 25 insertions(+), 10 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java index 6c85f913f5..e783e091ae 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMessage; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -281,7 +282,9 @@ public class KafkaPartitionLevelConsumerTest { consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch1.getMessageCount(), 500); for (int i = 0; i < batch1.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue(); + StreamMessage streamMessage = batch1.getStreamMessage(i); + Assert.assertNotNull(streamMessage.getMetadata()); + final byte[] msg = (byte[]) streamMessage.getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + i); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } @@ -290,7 +293,9 @@ public class KafkaPartitionLevelConsumerTest { consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch2.getMessageCount(), 500); for (int i = 0; i < batch2.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue(); + StreamMessage streamMessage = batch2.getStreamMessage(i); + Assert.assertNotNull(streamMessage.getMetadata()); + final byte[] msg = (byte[]) streamMessage.getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } @@ -298,7 +303,9 @@ public class KafkaPartitionLevelConsumerTest { final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000); Assert.assertEquals(batch3.getMessageCount(), 25); for (int i = 0; i < batch3.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue(); + StreamMessage streamMessage = batch3.getStreamMessage(i); + Assert.assertNotNull(streamMessage.getMetadata()); + final byte[] msg = (byte[]) streamMessage.getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 7ae4226e47..8bde04aed4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -57,12 +57,12 @@ public interface MessageBatch<T> { } default StreamMessage<T> getStreamMessage(int index) { - return new LegacyStreamMessage(getMessageBytesAtIndex(index)); + return new LegacyStreamMessage(getMessageBytesAtIndex(index), (StreamMessageMetadata) getMetadataAtIndex(index)); } class LegacyStreamMessage extends StreamMessage { - public LegacyStreamMessage(byte[] value) { - super(value, value.length); + public LegacyStreamMessage(byte[] value, StreamMessageMetadata metadata) { + super(value, value.length, metadata); } } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java index e626dc8106..17f66b6a5a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java @@ -52,7 +52,11 @@ public class StreamMessage<T> { } public StreamMessage(T value, int length) { - this(null, value, null, length); + this(value, length, null); + } + + public StreamMessage(T value, int length, @Nullable StreamMessageMetadata metadata) { + this(null, value, metadata, length); } public T getValue() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java index ac67249441..557069a581 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java @@ -34,6 +34,10 @@ public class StreamMessageMetadata implements RowMetadata { private final GenericRow _headers; private final Map<String, String> _metadata; + public StreamMessageMetadata(long recordIngestionTimeMs) { + this(recordIngestionTimeMs, Long.MIN_VALUE, null, Collections.emptyMap()); + } + public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) { this(recordIngestionTimeMs, Long.MIN_VALUE, headers, Collections.emptyMap()); } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java index 56bfb9b97b..9aaf4bc386 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java @@ -41,7 +41,7 @@ public class StreamDataDecoderImplTest { messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); String value = "Alice"; StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8), - value.getBytes(StandardCharsets.UTF_8).length); + value.getBytes(StandardCharsets.UTF_8).length, null); StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); Assert.assertNotNull(result); Assert.assertNull(result.getException()); @@ -87,7 +87,7 @@ public class StreamDataDecoderImplTest { messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); String value = "Alice"; StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8), - value.getBytes(StandardCharsets.UTF_8).length); + value.getBytes(StandardCharsets.UTF_8).length, null); StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); Assert.assertNotNull(result); Assert.assertNotNull(result.getException()); diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java index 10e5087493..d0d028709f 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java @@ -29,7 +29,7 @@ public class StreamMessageTest { public void testAllowNullKeyAndMetadata() { String value = "hello"; byte[] valBytes = value.getBytes(StandardCharsets.UTF_8); - StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length); + StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length, null); Assert.assertNull(msg.getKey()); Assert.assertNull(msg.getMetadata()); Assert.assertEquals(new String(msg.getValue()), value); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org