This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e07b576797 Handle shaded classes in all methods of kafka factory (#13087) e07b576797 is described below commit e07b576797e2dc834a60d8ee0da5d26a0b597e23 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Mon May 6 18:12:07 2024 +0530 Handle shaded classes in all methods of kafka factory (#13087) * Handle shaded classes in all methods of kafka factory * handle most ridiculous maven shading error --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java | 4 +++- .../apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java | 8 +++++++- .../java/org/apache/pinot/spi/stream/StreamConsumerFactory.java | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java index 820984439a..14bc6dcb06 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java @@ -35,11 +35,13 @@ public class KafkaConfigBackwardCompatibleUtils { */ public static void handleStreamConfig(StreamConfig streamConfig) { Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); + //FIXME: This needs to be done because maven shade plugin also overwrites the constants in the classes + String prefixToReplace = KAFKA_COMMON_PACKAGE_PREFIX.replace(PINOT_SHADED_PACKAGE_PREFIX, ""); for (Map.Entry<String, String> entry : streamConfigMap.entrySet()) { String[] valueParts = StringUtils.split(entry.getValue(), ' '); boolean updated = false; for (int i = 0; i < valueParts.length; i++) { - if (valueParts[i].startsWith(KAFKA_COMMON_PACKAGE_PREFIX)) { + if (valueParts[i].startsWith(prefixToReplace)) { try { Class.forName(valueParts[i]); } catch (ClassNotFoundException e1) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java index 0ca63f856d..0e3bca3f47 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java @@ -20,12 +20,19 @@ package org.apache.pinot.plugin.stream.kafka20; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamMetadataProvider; public class KafkaConsumerFactory extends StreamConsumerFactory { + @Override + protected void init(StreamConfig streamConfig) { + KafkaConfigBackwardCompatibleUtils.handleStreamConfig(streamConfig); + super.init(streamConfig); + } + @Override public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition); @@ -39,7 +46,6 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { @Override public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { - KafkaConfigBackwardCompatibleUtils.handleStreamConfig(_streamConfig); return new KafkaPartitionLevelConsumer(clientId, _streamConfig, partitionGroupConsumptionStatus.getPartitionGroupId()); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 5729dc2814..812b7b8e0f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -31,7 +31,7 @@ public abstract class StreamConsumerFactory { * Initializes the stream consumer factory with the stream metadata for the table * @param streamConfig the stream config object from the table config */ - void init(StreamConfig streamConfig) { + protected void init(StreamConfig streamConfig) { _streamConfig = streamConfig; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org