This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe2b013a65 Extract record keys, headers and metadata from Pulsar sources (#10995) fe2b013a65 is described below commit fe2b013a657e1ad6ac508a9a37933961bc4c408b Author: Jeff Bolle <jeffbo...@users.noreply.github.com> AuthorDate: Fri Jul 21 20:44:33 2023 -0400 Extract record keys, headers and metadata from Pulsar sources (#10995) --- .../pinot/plugin/stream/pulsar/PulsarConfig.java | 43 ++++- .../plugin/stream/pulsar/PulsarMessageBatch.java | 51 +----- .../stream/pulsar/PulsarMetadataExtractor.java | 182 +++++++++++++++++++++ .../PulsarPartitionLevelConnectionHandler.java | 3 +- .../pulsar/PulsarPartitionLevelConsumer.java | 26 +-- .../stream/pulsar/PulsarStreamLevelConsumer.java | 4 +- .../plugin/stream/pulsar/PulsarStreamMessage.java | 47 ++++++ .../stream/pulsar/PulsarStreamMessageMetadata.java | 76 +++++++++ .../pinot/plugin/stream/pulsar/PulsarUtils.java | 40 +++++ .../plugin/stream/pulsar/PulsarConfigTest.java | 118 +++++++++++++ .../plugin/stream/pulsar/PulsarConsumerTest.java | 34 ++-- .../stream/pulsar/PulsarMessageBatchTest.java | 36 ++-- .../stream/pulsar/PulsarMetadataExtractorTest.java | 92 +++++++++++ .../pinot/spi/stream/StreamDataDecoderImpl.java | 9 +- 14 files changed, 669 insertions(+), 92 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index 73ea2eca4b..4094a93f06 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -19,7 +19,13 @@ package org.apache.pinot.plugin.stream.pulsar; import com.google.common.base.Preconditions; +import java.util.Collections; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; @@ -37,6 +43,7 @@ public class PulsarConfig { public static final String AUTHENTICATION_TOKEN = "authenticationToken"; public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath"; public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch"; + public static final String METADATA_FIELDS = "metadataFields"; //list of the metadata fields comma separated private final String _pulsarTopicName; private final String _subscriberId; @@ -45,8 +52,10 @@ public class PulsarConfig { private final SubscriptionInitialPosition _subscriptionInitialPosition; private final String _authenticationToken; private final String _tlsTrustCertsFilePath; + @Deprecated(since = "v0.13.* since pulsar supports record key extraction") private final boolean _enableKeyValueStitch; - + private final boolean _populateMetadata; + private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> _metadataFields; public PulsarConfig(StreamConfig streamConfig, String subscriberId) { Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); _pulsarTopicName = streamConfig.getTopicName(); @@ -71,8 +80,32 @@ public class PulsarConfig { _subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria); _initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria); + _populateMetadata = Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE), + "false")); + String metadataFieldsToExtractCSV = streamConfig.getStreamConfigsMap().getOrDefault( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, METADATA_FIELDS), ""); + if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata) { + _metadataFields = Collections.emptySet(); + } else { + _metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV); + } } + private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> parseConfigStringToEnumSet( + String listOfMetadataFields) { + try { + String[] metadataFieldsArr = listOfMetadataFields.split(","); + return Stream.of(metadataFieldsArr) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(PulsarStreamMessageMetadata.PulsarMessageMetadataValue::findByKey) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid metadata fields list: " + listOfMetadataFields, e); + } + } public String getPulsarTopicName() { return _pulsarTopicName; } @@ -100,8 +133,14 @@ public class PulsarConfig { public String getTlsTrustCertsFilePath() { return _tlsTrustCertsFilePath; } - public boolean getEnableKeyValueStitch() { return _enableKeyValueStitch; } + public boolean isPopulateMetadata() { + return _populateMetadata; + } + + public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> getMetadataFields() { + return _metadataFields; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java index 6df313b722..912e8bef23 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java @@ -21,16 +21,12 @@ package org.apache.pinot.plugin.stream.pulsar; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.internal.DefaultImplementation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -39,13 +35,11 @@ import org.slf4j.LoggerFactory; * plugins will not work. A custom decoder will be needed to unpack key and value byte arrays and decode * them independently. */ -public class PulsarMessageBatch implements MessageBatch<byte[]> { - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageBatch.class); - private final List<Message<byte[]>> _messageList = new ArrayList<>(); - private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4); +public class PulsarMessageBatch implements MessageBatch<PulsarStreamMessage> { + private final List<PulsarStreamMessage> _messageList = new ArrayList<>(); private final boolean _enableKeyValueStitch; - public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean enableKeyValueStitch) { + public PulsarMessageBatch(Iterable<PulsarStreamMessage> iterable, boolean enableKeyValueStitch) { iterable.forEach(_messageList::add); _enableKeyValueStitch = enableKeyValueStitch; } @@ -56,26 +50,19 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> { } @Override - public byte[] getMessageAtIndex(int index) { - Message<byte[]> msg = _messageList.get(index); - if (_enableKeyValueStitch) { - return stitchKeyValue(msg.getKeyBytes(), msg.getData()); - } - return msg.getData(); + public PulsarStreamMessage getMessageAtIndex(int index) { + return _messageList.get(index); } @Override public int getMessageOffsetAtIndex(int index) { - return ByteBuffer.wrap(_messageList.get(index).getData()).arrayOffset(); + return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset(); } @Override public int getMessageLengthAtIndex(int index) { - if (_enableKeyValueStitch) { - Message<byte[]> msg = _messageList.get(index); - return 8 + msg.getKeyBytes().length + msg.getData().length; - } - return _messageList.get(index).getData().length; + return _messageList.get(index).getValue().length; //if _enableKeyValueStitch is true, + // then they are already stitched in the consumer. If false, then the value is the raw value } /** @@ -123,26 +110,4 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> { public long getNextStreamMessageOffsetAtIndex(int index) { throw new UnsupportedOperationException("Pulsar does not support long stream offsets"); } - - /** - * Stitch key and value bytes together using a simple format: - * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes - */ - private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) { - int keyLen = keyBytes.length; - int valueLen = valueBytes.length; - int totalByteArrayLength = 8 + keyLen + valueLen; - try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) { - LENGTH_BUF.clear(); - bos.write(LENGTH_BUF.putInt(keyLen).array()); - bos.write(keyBytes); - LENGTH_BUF.clear(); - bos.write(LENGTH_BUF.putInt(valueLen).array()); - bos.write(valueBytes); - return bos.toByteArray(); - } catch (Exception e) { - LOGGER.error("Unable to stitch key and value bytes together", e); - } - return null; - } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java new file mode 100644 index 0000000000..c33c32e7cb --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pulsar.client.api.Message; + +public interface PulsarMetadataExtractor { + static PulsarMetadataExtractor build(boolean populateMetadata, + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) { + return message -> { + long publishTime = message.getPublishTime(); + long brokerPublishTime = message.getBrokerPublishTime().orElse(0L); + long recordTimestamp = brokerPublishTime != 0 ? brokerPublishTime : publishTime; + + Map<String, String> metadataMap = populateMetadataMap(populateMetadata, message, metadataValuesToExtract); + + GenericRow headerGenericRow = populateMetadata ? buildGenericRow(message) : null; + return new PulsarStreamMessageMetadata(recordTimestamp, headerGenericRow, metadataMap); + }; + } + + RowMetadata extract(Message<?> record); + + static GenericRow buildGenericRow(Message<?> message) { + if (MapUtils.isEmpty(message.getProperties())) { + return null; + } + GenericRow genericRow = new GenericRow(); + for (Map.Entry<String, String> entry : message.getProperties().entrySet()) { + genericRow.putValue(entry.getKey(), entry.getValue()); + } + return genericRow; + } + + static Map<String, String> populateMetadataMap(boolean populateAllFields, Message<?> message, + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) { + + Map<String, String> metadataMap = new HashMap<>(); + populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, message, metadataMap); + populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, message, metadataMap); + populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, message, + metadataMap); + populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, message, metadataMap); + + // Populate some timestamps for lag calculation even if populateMetadata is false + + if (!populateAllFields) { + return metadataMap; + } + + for (PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue : metadataValuesToExtract) { + populateMetadataField(metadataValue, message, metadataMap); + } + + return metadataMap; + } + + private static void populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue value, + Message<?> message, Map<String, String> metadataMap) { + switch (value) { + case PUBLISH_TIME: + long publishTime = message.getPublishTime(); + if (publishTime > 0) { + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, + publishTime); + } + break; + case EVENT_TIME: + long eventTime = message.getEventTime(); + if (eventTime > 0) { + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, + eventTime); + } + break; + case BROKER_PUBLISH_TIME: + message.getBrokerPublishTime() + .ifPresent(brokerPublishTime -> setMetadataMapField(metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, brokerPublishTime)); + break; + case MESSAGE_KEY: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, + message.getKey()); + break; + case MESSAGE_ID: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID, + message.getMessageId().toString()); + break; + case MESSAGE_ID_BYTES_B64: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64, + message.getMessageId().toByteArray()); + break; + case PRODUCER_NAME: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PRODUCER_NAME, + message.getProducerName()); + break; + case SCHEMA_VERSION: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SCHEMA_VERSION, + message.getSchemaVersion()); + break; + case SEQUENCE_ID: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SEQUENCE_ID, + message.getSequenceId()); + break; + case ORDERING_KEY: + if (message.hasOrderingKey()) { + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.ORDERING_KEY, + message.getOrderingKey()); + } + break; + case SIZE: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SIZE, + message.size()); + break; + case TOPIC_NAME: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME, + message.getTopicName()); + break; + case INDEX: + message.getIndex().ifPresent(index -> setMetadataMapField(metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.INDEX, index)); + break; + case REDELIVERY_COUNT: + setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.REDELIVERY_COUNT, + message.getRedeliveryCount()); + break; + default: + throw new IllegalArgumentException("Unsupported metadata value: " + value); + } + } + + private static void setMetadataMapField(Map<String, String> metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue, + String value) { + if (StringUtils.isNotBlank(value)) { + metadataMap.put(metadataValue.getKey(), value); + } + } + + private static void setMetadataMapField(Map<String, String> metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue, + int value) { + setMetadataMapField(metadataMap, metadataValue, String.valueOf(value)); + } + + private static void setMetadataMapField(Map<String, String> metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue, + long value) { + setMetadataMapField(metadataMap, metadataValue, String.valueOf(value)); + } + + private static void setMetadataMapField(Map<String, String> metadataMap, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue, + byte[] value) { + if (value != null && value.length > 0) { + setMetadataMapField(metadataMap, metadataValue, Base64.getEncoder().encodeToString(value)); + } + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java index 3ad57b55bb..11033ec716 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java @@ -40,6 +40,7 @@ public class PulsarPartitionLevelConnectionHandler { protected final PulsarConfig _config; protected final String _clientId; protected PulsarClient _pulsarClient = null; + protected final PulsarMetadataExtractor _pulsarMetadataExtractor; /** * Creates a new instance of {@link PulsarClient} and {@link Reader} @@ -47,7 +48,7 @@ public class PulsarPartitionLevelConnectionHandler { public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig) { _config = new PulsarConfig(streamConfig, clientId); _clientId = clientId; - + _pulsarMetadataExtractor = PulsarMetadataExtractor.build(_config.isPopulateMetadata(), _config.getMetadataFields()); try { ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()); if (_config.getTlsTrustCertsFilePath() != null) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java index 0e55a07aa3..d1b80b0360 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java @@ -27,7 +27,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.StreamConfig; @@ -48,15 +47,15 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class); private final ExecutorService _executorService; private final Reader _reader; - private boolean _enableKeyValueStitch = false; + private boolean _enableKeyValueStitch; public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { super(clientId, streamConfig); PulsarConfig config = new PulsarConfig(streamConfig, clientId); - _reader = createReaderForPartition(config.getPulsarTopicName(), - partitionGroupConsumptionStatus.getPartitionGroupId(), - config.getInitialMessageId()); + _reader = + createReaderForPartition(config.getPulsarTopicName(), partitionGroupConsumptionStatus.getPartitionGroupId(), + config.getInitialMessageId()); LOGGER.info("Created pulsar reader with id {} for topic {} partition {}", _reader, _config.getPulsarTopicName(), partitionGroupConsumptionStatus.getPartitionGroupId()); _executorService = Executors.newSingleThreadExecutor(); @@ -64,19 +63,19 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection } /** - * Fetch records from the Pulsar stream between the start and end KinesisCheckpoint + * Fetch records from the Pulsar stream between the start and end StreamPartitionMsgOffset * Used {@link org.apache.pulsar.client.api.Reader} to read the messaged from pulsar partitioned topic * The reader seeks to the startMsgOffset and starts reading records in a loop until endMsgOffset or timeout is * reached. */ @Override - public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, - int timeoutMillis) { + public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, + StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId(); final MessageId endMessageId = endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId(); - List<Message<byte[]>> messagesList = new ArrayList<>(); + List<PulsarStreamMessage> messagesList = new ArrayList<>(); Future<PulsarMessageBatch> pulsarResultFuture = _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList)); @@ -96,7 +95,7 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection } public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId, - List<Message<byte[]>> messagesList) { + List<PulsarStreamMessage> messagesList) { try { _reader.seek(startMessageId); @@ -108,7 +107,8 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection break; } } - messagesList.add(nextMessage); + messagesList.add( + PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor)); if (Thread.interrupted()) { break; @@ -124,11 +124,11 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection } } - private Iterable<Message<byte[]>> buildOffsetFilteringIterable(final List<Message<byte[]>> messageAndOffsets, + private Iterable<PulsarStreamMessage> buildOffsetFilteringIterable(final List<PulsarStreamMessage> messageAndOffsets, final MessageId startOffset, final MessageId endOffset) { return Iterables.filter(messageAndOffsets, input -> { // Filter messages that are either null or have an offset ∉ [startOffset, endOffset] - return input != null && input.getData() != null && (input.getMessageId().compareTo(startOffset) >= 0) && ( + return input != null && input.getValue() != null && (input.getMessageId().compareTo(startOffset) >= 0) && ( (endOffset == null) || (input.getMessageId().compareTo(endOffset) < 0)); }); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java index 60272c6212..82040f6de3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java @@ -81,9 +81,9 @@ public class PulsarStreamLevelConsumer implements StreamLevelConsumer { // Log every minute or 100k events if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 100000) { if (_lastCount == 0) { - _logger.info("Consumed {} events from kafka stream {}", _currentCount, _streamConfig.getTopicName()); + _logger.info("Consumed {} events from pulsar stream {}", _currentCount, _streamConfig.getTopicName()); } else { - _logger.info("Consumed {} events from kafka stream {} (rate:{}/s)", _currentCount - _lastCount, + _logger.info("Consumed {} events from pulsar stream {} (rate:{}/s)", _currentCount - _lastCount, _streamConfig.getTopicName(), (float) (_currentCount - _lastCount) * 1000 / (now - _lastLogTime)); } _lastCount = _currentCount; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java new file mode 100644 index 0000000000..7e09197857 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import javax.annotation.Nullable; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pulsar.client.api.MessageId; + +public class PulsarStreamMessage extends StreamMessage<byte[]> { + + private final MessageId _messageId; + public PulsarStreamMessage(@Nullable byte[] key, byte[] value, MessageId messageId, + @Nullable PulsarStreamMessageMetadata metadata, int length) { + super(key, value, metadata, length); + _messageId = messageId; + } + + public MessageId getMessageId() { + return _messageId; + } + + int getKeyLength() { + byte[] key = getKey(); + return key == null ? 0 : key.length; + } + + int getValueLength() { + byte[] value = getValue(); + return value == null ? 0 : value.length; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java new file mode 100644 index 0000000000..59220138d7 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.EnumSet; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageMetadata; + +/** + * Pulsar specific implementation of {@link StreamMessageMetadata} + * Pulsar makes many metadata values available for each message. Please see the pulsar documentation for more details. + * @see <a href="https://pulsar.apache.org/docs/en/concepts-messaging/#message-properties">Pulsar Message Properties</a> + */ +public class PulsarStreamMessageMetadata extends StreamMessageMetadata { + + public enum PulsarMessageMetadataValue { + PUBLISH_TIME("publishTime"), + EVENT_TIME("eventTime"), + BROKER_PUBLISH_TIME("brokerPublishTime"), + MESSAGE_KEY("key"), + MESSAGE_ID("messageId"), + MESSAGE_ID_BYTES_B64("messageIdBytes"), + PRODUCER_NAME("producerName"), + SCHEMA_VERSION("schemaVersion"), + SEQUENCE_ID("sequenceId"), + ORDERING_KEY("orderingKey"), + SIZE("size"), + TOPIC_NAME("topicName"), + INDEX("index"), + REDELIVERY_COUNT("redeliveryCount"); + + private final String _key; + + PulsarMessageMetadataValue(String key) { + _key = key; + } + + public String getKey() { + return _key; + } + + public static PulsarMessageMetadataValue findByKey(final String key) { + EnumSet<PulsarMessageMetadataValue> values = EnumSet.allOf(PulsarMessageMetadataValue.class); + return values.stream().filter(value -> value.getKey().equals(key)).findFirst().orElse(null); + } + } + + public PulsarStreamMessageMetadata(long recordIngestionTimeMs, + @Nullable GenericRow headers) { + super(recordIngestionTimeMs, headers); + } + + public PulsarStreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, + Map<String, String> metadata) { + super(recordIngestionTimeMs, headers, metadata); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java index 763b0fc0d4..d22f8b0b5f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java @@ -18,13 +18,22 @@ */ package org.apache.pinot.plugin.stream.pulsar; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PulsarUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarUtils.class); + + private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4); + private PulsarUtils() { } @@ -51,4 +60,35 @@ public class PulsarUtils { throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } + + /** + * Stitch key and value bytes together using a simple format: + * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes + */ + protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) { + int keyLen = keyBytes.length; + int valueLen = valueBytes.length; + int totalByteArrayLength = 8 + keyLen + valueLen; + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) { + LENGTH_BUF.clear(); + bos.write(LENGTH_BUF.putInt(keyLen).array()); + bos.write(keyBytes); + LENGTH_BUF.clear(); + bos.write(LENGTH_BUF.putInt(valueLen).array()); + bos.write(valueBytes); + return bos.toByteArray(); + } catch (Exception e) { + LOGGER.error("Unable to stitch key and value bytes together", e); + } + return null; + } + + protected static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch, + PulsarMetadataExtractor pulsarMetadataExtractor) { + byte[] key = message.getKeyBytes(); + byte[] data = enableKeyValueStitch ? stitchKeyValue(key, message.getData()) : message.getData(); + int dataLength = (data != null) ? data.length : 0; + return new PulsarStreamMessage(key, data, message.getMessageId(), + (PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), dataLength); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java new file mode 100644 index 0000000000..ad23c83ce0 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class PulsarConfigTest { + public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME"; + + public static final String STREAM_TYPE = "pulsar"; + public static final String STREAM_PULSAR_BROKER_LIST = "pulsar://localhost:6650"; + public static final String STREAM_PULSAR_CONSUMER_TYPE = "simple"; + Map<String, String> getCommonStreamConfigMap() { + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", STREAM_TYPE); + streamConfigMap.put("stream.pulsar.consumer.type", STREAM_PULSAR_CONSUMER_TYPE); + streamConfigMap.put("stream.pulsar.topic.name", "test-topic"); + streamConfigMap.put("stream.pulsar.bootstrap.servers", STREAM_PULSAR_BROKER_LIST); + streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest"); + streamConfigMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName()); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), + "1000"); + streamConfigMap.put("stream.pulsar.decoder.class.name", "decoderClass"); + return streamConfigMap; + } + + @Test + public void testParsingMetadataConfigWithConfiguredFields() throws Exception { + Map<String, String> streamConfigMap = getCommonStreamConfigMap(); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE), + "true"); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS), + "messageId,messageIdBytes, publishTime, eventTime, key, topicName, "); + StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap); + PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract = + pulsarConfig.getMetadataFields(); + Assert.assertEquals(metadataFieldsToExtract.size(), 6); + Assert.assertTrue(metadataFieldsToExtract.containsAll(List.of( + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, + PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME))); + } + + @Test + public void testParsingMetadataConfigWithoutConfiguredFields() throws Exception { + Map<String, String> streamConfigMap = getCommonStreamConfigMap(); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE), + "true"); + StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap); + PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract = + pulsarConfig.getMetadataFields(); + Assert.assertEquals(metadataFieldsToExtract.size(), 0); + } + + @Test + public void testParsingNoMetadataConfig() throws Exception { + Map<String, String> streamConfigMap = getCommonStreamConfigMap(); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE), + "false"); + StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap); + PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); + Assert.assertFalse(pulsarConfig.isPopulateMetadata()); + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract = + pulsarConfig.getMetadataFields(); + Assert.assertEquals(metadataFieldsToExtract.size(), 0); + } + + @Test + public void testParsingNoMetadataConfigWithConfiguredFields() throws Exception { + Map<String, String> streamConfigMap = getCommonStreamConfigMap(); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE), + "false"); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS), + "messageId,messageIdBytes, publishTime, eventTime, key, topicName, "); + StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap); + PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); + Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract = + pulsarConfig.getMetadataFields(); + Assert.assertFalse(pulsarConfig.isPopulateMetadata()); + Assert.assertEquals(metadataFieldsToExtract.size(), 0); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java index 4779e6afcd..9d59f82fcc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java @@ -281,34 +281,35 @@ public class PulsarConsumerTest { int totalMessagesReceived = 0; - final PartitionGroupConsumer consumer = - streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); - final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), + final PulsarPartitionLevelConsumer consumer = + (PulsarPartitionLevelConsumer) streamConsumerFactory + .createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); + final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch1.getMessageCount(), 500); for (int i = 0; i < messageBatch1.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i); + final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + i); totalMessagesReceived++; } - final MessageBatch messageBatch2 = + final PulsarMessageBatch messageBatch2 = consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null, CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch2.getMessageCount(), 500); for (int i = 0; i < messageBatch2.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); + final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); totalMessagesReceived++; } - final MessageBatch messageBatch3 = + final PulsarMessageBatch messageBatch3 = consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)), new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)), CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch3.getMessageCount(), 25); for (int i = 0; i < messageBatch3.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i); + final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); } @@ -333,36 +334,37 @@ public class PulsarConsumerTest { int totalMessagesReceived = 0; - final PartitionGroupConsumer consumer = - streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); + final PulsarPartitionLevelConsumer consumer = + (PulsarPartitionLevelConsumer) streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, + partitionGroupConsumptionStatus); //TODO: This test failed, check it out. - final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), + final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch1.getMessageCount(), 500); for (int i = 0; i < messageBatch1.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i); + final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + i); totalMessagesReceived++; } - final MessageBatch messageBatch2 = + final PulsarMessageBatch messageBatch2 = consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null, CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch2.getMessageCount(), 500); for (int i = 0; i < messageBatch2.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); + final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); totalMessagesReceived++; } - final MessageBatch messageBatch3 = + final PulsarMessageBatch messageBatch3 = consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)), new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)), CONSUMER_FETCH_TIMEOUT_MILLIS); Assert.assertEquals(messageBatch3.getMessageCount(), 25); for (int i = 0; i < messageBatch3.getMessageCount(); i++) { - final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i); + final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java index 7cc0a99a6c..904dd33a04 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java @@ -20,10 +20,13 @@ package org.apache.pinot.plugin.stream.pulsar; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.EncryptionContext; @@ -37,30 +40,33 @@ public class PulsarMessageBatchTest { private DummyPulsarMessage _msgWithKeyAndValue; private byte[] _expectedValueBytes; private byte[] _expectedKeyBytes; - private List<Message<byte[]>> _messageList; + private List<DummyPulsarMessage> _messageList; + private PulsarMetadataExtractor _metadataExtractor; - class DummyPulsarMessage implements Message<byte[]> { + public static class DummyPulsarMessage implements Message<byte[]> { private final byte[] _keyData; private final byte[] _valueData; + private Map<String, String> _properties; public DummyPulsarMessage(byte[] key, byte[] value) { _keyData = key; _valueData = value; + _properties = new HashMap<>(); } @Override public Map<String, String> getProperties() { - return null; + return _properties; } @Override public boolean hasProperty(String name) { - return false; + return _properties.containsKey(name); } @Override public String getProperty(String name) { - return null; + return _properties.get(name); } @Override @@ -80,7 +86,7 @@ public class PulsarMessageBatchTest { @Override public MessageId getMessageId() { - return null; + return MessageId.earliest; } @Override @@ -110,7 +116,7 @@ public class PulsarMessageBatchTest { @Override public String getKey() { - return _keyData.toString(); + return new String(_keyData); } @Override @@ -196,20 +202,28 @@ public class PulsarMessageBatchTest { _random.nextBytes(_expectedKeyBytes); _msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes, _expectedValueBytes); _messageList = new ArrayList<>(); + _metadataExtractor = PulsarMetadataExtractor.build(true, + EnumSet.allOf(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.class)); _messageList.add(_msgWithKeyAndValue); } @Test public void testMessageBatchNoStitching() { - PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, false); - byte[] valueBytes = messageBatch.getMessageAtIndex(0); + List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message -> + PulsarUtils.buildPulsarStreamMessage(message, false, _metadataExtractor)) + .collect(Collectors.toList()); + PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, false); + byte[] valueBytes = messageBatch.getMessageAtIndex(0).getValue(); Assert.assertArrayEquals(_expectedValueBytes, valueBytes); } @Test public void testMessageBatchWithStitching() { - PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, true); - byte[] keyValueBytes = messageBatch.getMessageAtIndex(0); + List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message -> + PulsarUtils.buildPulsarStreamMessage(message, true, _metadataExtractor)) + .collect(Collectors.toList()); + PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, true); + byte[] keyValueBytes = messageBatch.getMessageAtIndex(0).getValue(); Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length + _expectedValueBytes.length); try { ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java new file mode 100644 index 0000000000..4c8e28021a --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import java.nio.charset.StandardCharsets; +import java.util.Set; +import org.apache.pulsar.client.api.MessageId; +import org.bouncycastle.util.encoders.Base64; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.plugin.stream.pulsar.PulsarMessageBatchTest.DummyPulsarMessage; +import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID; +import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64; +import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY; +import static org.testng.Assert.assertEquals; + + +public class PulsarMetadataExtractorTest { + + private PulsarMetadataExtractor _metadataExtractor; + + @BeforeClass + public void setup() { + _metadataExtractor = PulsarMetadataExtractor.build(true, Set.of(MESSAGE_ID, MESSAGE_ID_BYTES_B64, MESSAGE_KEY)); + } + + @Test + public void testExtractProperty() + throws Exception { + DummyPulsarMessage pulsarMessage = + new DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)); + pulsarMessage.getProperties().put("test_key", "test_value"); + pulsarMessage.getProperties().put("test_key2", "2"); + PulsarStreamMessageMetadata metadata = (PulsarStreamMessageMetadata) _metadataExtractor.extract(pulsarMessage); + assertEquals("test_value", metadata.getHeaders().getValue("test_key")); + assertEquals("2", metadata.getHeaders().getValue("test_key2")); + assertEquals("key", metadata.getRecordMetadata().get(MESSAGE_KEY.getKey())); + String messageIdStr = metadata.getRecordMetadata().get(MESSAGE_ID.getKey()); + assertEquals(pulsarMessage.getMessageId().toString(), messageIdStr); + + byte[] messageIdBytes = Base64.decode(metadata.getRecordMetadata().get(MESSAGE_ID_BYTES_B64.getKey())); + MessageId messageId = MessageId.fromByteArray(messageIdBytes); + assertEquals(MessageId.earliest, messageId); + } + + @Test + public void testPulsarSteamMessageUnstitched() { + String key = "key"; + String value = "value"; + DummyPulsarMessage dummyPulsarMessage = + new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); + PulsarStreamMessage streamMessage = + PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, false, _metadataExtractor); + assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey()); + assertEquals(value.getBytes(StandardCharsets.UTF_8), streamMessage.getValue()); + assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength()); + assertEquals(value.getBytes(StandardCharsets.UTF_8).length, streamMessage.getValueLength()); + } + + @Test + public void testPulsarSteamMessageStitched() { + String key = "key"; + String value = "value"; + byte[] stitchedValueBytes = + PulsarUtils.stitchKeyValue(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); + DummyPulsarMessage dummyPulsarMessage = + new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); + PulsarStreamMessage streamMessage = + PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, true, _metadataExtractor); + assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey()); + assertEquals(stitchedValueBytes, streamMessage.getValue()); + assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength()); + assertEquals(stitchedValueBytes.length, streamMessage.getValueLength()); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java index 97958b92d3..b570067a69 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -50,10 +50,11 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8)); } RowMetadata metadata = message.getMetadata(); - if (metadata != null && metadata.getHeaders() != null) { - metadata.getHeaders().getFieldToValueMap() - .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value)); - + if (metadata != null) { + if (metadata.getHeaders() != null) { + metadata.getHeaders().getFieldToValueMap() + .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value)); + } metadata.getRecordMetadata() .forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org