This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 37c618445a Remove JsonUtils dependency due to classpath issues (#11264) 37c618445a is described below commit 37c618445a5ea6344ec388950ae68eec1a585238 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Aug 4 02:00:56 2023 +0530 Remove JsonUtils dependency due to classpath issues (#11264) Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../kinesis/KinesisPartitionGroupOffset.java | 23 ++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java index 4ee247f365..12af4765fd 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java @@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Map; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -import org.apache.pinot.spi.utils.JsonUtils; - /** * A {@link StreamPartitionMsgOffset} implementation for the Kinesis partition group consumption @@ -40,6 +41,10 @@ import org.apache.pinot.spi.utils.JsonUtils; * The longer the time period between write requests, the larger the sequence numbers become. */ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset { + private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); + public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader(); + public static final ObjectWriter DEFAULT_WRITER = DEFAULT_MAPPER.writer(); + private final Map<String, String> _shardToStartSequenceMap; public KinesisPartitionGroupOffset(Map<String, String> shardToStartSequenceMap) { @@ -48,7 +53,7 @@ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset { public KinesisPartitionGroupOffset(String offsetStr) throws IOException { - _shardToStartSequenceMap = JsonUtils.stringToObject(offsetStr, new TypeReference<Map<String, String>>() { + _shardToStartSequenceMap = stringToObject(offsetStr, new TypeReference<Map<String, String>>() { }); } @@ -59,7 +64,7 @@ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset { @Override public String toString() { try { - return JsonUtils.objectToString(_shardToStartSequenceMap); + return objectToString(_shardToStartSequenceMap); } catch (JsonProcessingException e) { throw new IllegalStateException( "Caught exception when converting KinesisCheckpoint to string: " + _shardToStartSequenceMap); @@ -91,4 +96,14 @@ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset { return _shardToStartSequenceMap.values().iterator().next() .compareTo(other._shardToStartSequenceMap.values().iterator().next()); } + + public static <T> T stringToObject(String jsonString, TypeReference<T> valueTypeRef) + throws IOException { + return DEFAULT_READER.forType(valueTypeRef).readValue(jsonString); + } + + public static String objectToString(Object object) + throws JsonProcessingException { + return DEFAULT_WRITER.writeValueAsString(object); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org