This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa3ea893b Remove usage of deprecated KafkaJSONMessageDecoder (#16150)
1aa3ea893b is described below

commit 1aa3ea893b6864d991066adc5d2b1dc51978269e
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jun 18 19:29:45 2025 -0600

    Remove usage of deprecated KafkaJSONMessageDecoder (#16150)
---
 .../Homepage/Operations/AddIngestionComponent.tsx  |  2 +-
 .../Operations/AddRealTimeIngestionComponent.tsx   |  2 +-
 .../Homepage/Operations/AddRealtimeTableOp.tsx     |  2 +-
 .../inputformat/json/JSONMessageDecoderTest.java}  | 71 ++++++++++++----------
 .../src/test/resources/data/test_sample_data.json  |  0
 .../test_sample_data_schema_no_time_field.json     |  0
 ...sample_data_schema_with_outgoing_time_spec.json |  0
 ...ple_data_schema_without_outgoing_time_spec.json |  0
 .../pinot-kafka-2.0/README.md                      |  8 +--
 .../stream/kafka/KafkaJSONMessageDecoder.java      |  3 +-
 10 files changed, 46 insertions(+), 42 deletions(-)

diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
index 8621c1c152..1724b7fd32 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
@@ -89,7 +89,7 @@ export default function AddIngestionComponent({
             "stream.kafka.broker.list": "",
             "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
             
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-            
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+            
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
             "realtime.segment.flush.threshold.rows": "0",
             "realtime.segment.flush.threshold.segment.rows": "0",
             "realtime.segment.flush.threshold.time": "24h",
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
index 0118f05748..196ef01eee 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
@@ -89,7 +89,7 @@ export default function AddRealTimeIngestionComponent({
             "stream.kafka.broker.list": "",
             "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
             
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-            
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+            
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
             "realtime.segment.flush.threshold.rows": "0",
             "realtime.segment.flush.threshold.segment.rows": "0",
             "realtime.segment.flush.threshold.time": "24h",
diff --git 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
index 8ddd36910d..e6c8c5c77d 100644
--- 
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
@@ -102,7 +102,7 @@ const defaultTableObj = {
       "stream.kafka.broker.list": "",
       "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
       "realtime.segment.flush.threshold.rows": "0",
       "realtime.segment.flush.threshold.segment.rows": "0",
       "realtime.segment.flush.threshold.time": "24h",
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
similarity index 56%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
rename to 
pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
index dc4810d611..5c2bda51b8 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
@@ -16,31 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.stream.kafka;
+package org.apache.pinot.plugin.inputformat.json;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
-public class KafkaJSONMessageDecoderTest {
+
+public class JSONMessageDecoderTest {
 
   @Test
   public void testJsonDecoderWithoutOutgoingTimeSpec()
       throws Exception {
-    Schema schema = Schema.fromFile(new File(
-        
getClass().getClassLoader().getResource("data/test_sample_data_schema_without_outgoing_time_spec.json")
-            .getFile()));
-    Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+    Schema schema = 
loadSchema("data/test_sample_data_schema_without_outgoing_time_spec.json");
+    Map<String, DataType> sourceFields = new HashMap<>();
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
     }
@@ -50,66 +53,70 @@ public class KafkaJSONMessageDecoderTest {
   @Test
   public void testJsonDecoderWithOutgoingTimeSpec()
       throws Exception {
-    Schema schema = Schema.fromFile(new File(
-        
getClass().getClassLoader().getResource("data/test_sample_data_schema_with_outgoing_time_spec.json")
-            .getFile()));
-    Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+    Schema schema = 
loadSchema("data/test_sample_data_schema_with_outgoing_time_spec.json");
+    Map<String, DataType> sourceFields = new HashMap<>();
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
     }
     sourceFields.remove("secondsSinceEpoch");
-    sourceFields.put("time_day", FieldSpec.DataType.INT);
+    sourceFields.put("time_day", DataType.INT);
     testJsonDecoder(sourceFields);
   }
 
   @Test
   public void testJsonDecoderNoTimeSpec()
       throws Exception {
-    Schema schema = Schema.fromFile(
-        new 
File(getClass().getClassLoader().getResource("data/test_sample_data_schema_no_time_field.json").getFile()));
-    Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+    Schema schema = 
loadSchema("data/test_sample_data_schema_no_time_field.json");
+    Map<String, DataType> sourceFields = new HashMap<>();
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
     }
     testJsonDecoder(sourceFields);
   }
 
-  private void testJsonDecoder(Map<String, FieldSpec.DataType> sourceFields)
+  private Schema loadSchema(String resourcePath)
+      throws Exception {
+    URL resource = getClass().getClassLoader().getResource(resourcePath);
+    assertNotNull(resource);
+    return Schema.fromFile(new File(resource.getFile()));
+  }
+
+  private void testJsonDecoder(Map<String, DataType> sourceFields)
       throws Exception {
-    try (BufferedReader reader = new BufferedReader(
-        new 
FileReader(getClass().getClassLoader().getResource("data/test_sample_data.json").getFile())))
 {
-      KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
-      decoder.init(new HashMap<>(), sourceFields.keySet(), "testTopic");
-      GenericRow r = new GenericRow();
-      String line = reader.readLine();
-      while (line != null) {
+    URL resource = 
getClass().getClassLoader().getResource("data/test_sample_data.json");
+    assertNotNull(resource);
+    try (BufferedReader reader = new BufferedReader(new 
FileReader(resource.getFile()))) {
+      JSONMessageDecoder decoder = new JSONMessageDecoder();
+      decoder.init(Map.of(), sourceFields.keySet(), "testTopic");
+      GenericRow row = new GenericRow();
+      String line;
+      while ((line = reader.readLine()) != null) {
         JsonNode jsonNode = JsonUtils.DEFAULT_READER.readTree(line);
-        decoder.decode(line.getBytes(), r);
+        decoder.decode(line.getBytes(), row);
         for (String field : sourceFields.keySet()) {
-          Object actualValue = r.getValue(field);
+          Object actualValue = row.getValue(field);
           JsonNode expectedValue = jsonNode.get(field);
           switch (sourceFields.get(field)) {
             case STRING:
-              Assert.assertEquals(actualValue, expectedValue.asText());
+              assertEquals(actualValue, expectedValue.asText());
               break;
             case INT:
-              Assert.assertEquals(actualValue, expectedValue.asInt());
+              assertEquals(actualValue, expectedValue.asInt());
               break;
             case LONG:
-              Assert.assertEquals(actualValue, expectedValue.asLong());
+              assertEquals(actualValue, expectedValue.asLong());
               break;
             case FLOAT:
-              Assert.assertEquals(actualValue, (float) 
expectedValue.asDouble());
+              assertEquals(actualValue, (float) expectedValue.asDouble());
               break;
             case DOUBLE:
-              Assert.assertEquals(actualValue, expectedValue.asDouble());
+              assertEquals(actualValue, expectedValue.asDouble());
               break;
             default:
-              Assert.assertTrue(false, "Shouldn't arrive here.");
+              fail("Shouldn't arrive here.");
               break;
           }
         }
-        line = reader.readLine();
       }
     }
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json
similarity index 100%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json
rename to 
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json
similarity index 100%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json
rename to 
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
similarity index 100%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
rename to 
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
similarity index 100%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
rename to 
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
index 9016804697..e32826700f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
@@ -30,15 +30,13 @@ A stream plugin for another version of kafka, or another 
stream, can be added in
 ```
 
 * How to use Kafka 2.x connector
-Below is a sample `streamConfigs` used to create a realtime table with Kafka 
Stream(High) level consumer:
+Below is a sample `streamConfigs` used to create a real-time table with Kafka 
consumer:
 ```$xslt
 "streamConfigs": {
   "streamType": "kafka",
+  "stream.kafka.broker.list": "localhost:19092",
   "stream.kafka.topic.name": "meetupRSVPEvents",
-  "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
-  "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
   "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-  "stream.kafka.zk.broker.url": "localhost:2191/kafka",
-  "stream.kafka.hlc.bootstrap.server": "localhost:19092"
+  "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
 }
 ```
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index cf12fcc9cc..d3d55572cd 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -22,8 +22,7 @@ import 
org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder;
 
 
 /**
- * This class has been kept for backward compatability. Use @see 
`org.apache.pinot.plugin.inputformat.json
- * .StreamJSONMessageDecoder` for future use cases.
+ * This class has been kept for backward compatibility. Use {@link 
JSONMessageDecoder} for future use cases.
  * This class will be removed in a later release.
  */
 @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to