This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37c618445a Remove JsonUtils dependency due to classpath issues (#11264)
37c618445a is described below

commit 37c618445a5ea6344ec388950ae68eec1a585238
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Aug 4 02:00:56 2023 +0530

    Remove JsonUtils dependency due to classpath issues (#11264)
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../kinesis/KinesisPartitionGroupOffset.java       | 23 ++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
index 4ee247f365..12af4765fd 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
@@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.JsonUtils;
-
 
 /**
  * A {@link StreamPartitionMsgOffset} implementation for the Kinesis partition 
group consumption
@@ -40,6 +41,10 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * The longer the time period between write requests, the larger the sequence 
numbers become.
  */
 public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset {
+  private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+  public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader();
+  public static final ObjectWriter DEFAULT_WRITER = DEFAULT_MAPPER.writer();
+
   private final Map<String, String> _shardToStartSequenceMap;
 
   public KinesisPartitionGroupOffset(Map<String, String> 
shardToStartSequenceMap) {
@@ -48,7 +53,7 @@ public class KinesisPartitionGroupOffset implements 
StreamPartitionMsgOffset {
 
   public KinesisPartitionGroupOffset(String offsetStr)
       throws IOException {
-    _shardToStartSequenceMap = JsonUtils.stringToObject(offsetStr, new 
TypeReference<Map<String, String>>() {
+    _shardToStartSequenceMap = stringToObject(offsetStr, new 
TypeReference<Map<String, String>>() {
     });
   }
 
@@ -59,7 +64,7 @@ public class KinesisPartitionGroupOffset implements 
StreamPartitionMsgOffset {
   @Override
   public String toString() {
     try {
-      return JsonUtils.objectToString(_shardToStartSequenceMap);
+      return objectToString(_shardToStartSequenceMap);
     } catch (JsonProcessingException e) {
       throw new IllegalStateException(
           "Caught exception when converting KinesisCheckpoint to string: " + 
_shardToStartSequenceMap);
@@ -91,4 +96,14 @@ public class KinesisPartitionGroupOffset implements 
StreamPartitionMsgOffset {
     return _shardToStartSequenceMap.values().iterator().next()
         .compareTo(other._shardToStartSequenceMap.values().iterator().next());
   }
+
+  public static <T> T stringToObject(String jsonString, TypeReference<T> 
valueTypeRef)
+      throws IOException {
+    return DEFAULT_READER.forType(valueTypeRef).readValue(jsonString);
+  }
+
+  public static String objectToString(Object object)
+      throws JsonProcessingException {
+    return DEFAULT_WRITER.writeValueAsString(object);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to