This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1aa3ea893b Remove usage of deprecated KafkaJSONMessageDecoder (#16150) 1aa3ea893b is described below commit 1aa3ea893b6864d991066adc5d2b1dc51978269e Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Jun 18 19:29:45 2025 -0600 Remove usage of deprecated KafkaJSONMessageDecoder (#16150) --- .../Homepage/Operations/AddIngestionComponent.tsx | 2 +- .../Operations/AddRealTimeIngestionComponent.tsx | 2 +- .../Homepage/Operations/AddRealtimeTableOp.tsx | 2 +- .../inputformat/json/JSONMessageDecoderTest.java} | 71 ++++++++++++---------- .../src/test/resources/data/test_sample_data.json | 0 .../test_sample_data_schema_no_time_field.json | 0 ...sample_data_schema_with_outgoing_time_spec.json | 0 ...ple_data_schema_without_outgoing_time_spec.json | 0 .../pinot-kafka-2.0/README.md | 8 +-- .../stream/kafka/KafkaJSONMessageDecoder.java | 3 +- 10 files changed, 46 insertions(+), 42 deletions(-) diff --git a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx index 8621c1c152..1724b7fd32 100644 --- a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx +++ b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx @@ -89,7 +89,7 @@ export default function AddIngestionComponent({ "stream.kafka.broker.list": "", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.segment.rows": "0", "realtime.segment.flush.threshold.time": "24h", diff --git a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx index 0118f05748..196ef01eee 100644 --- a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx +++ b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx @@ -89,7 +89,7 @@ export default function AddRealTimeIngestionComponent({ "stream.kafka.broker.list": "", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.segment.rows": "0", "realtime.segment.flush.threshold.time": "24h", diff --git a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx index 8ddd36910d..e6c8c5c77d 100644 --- a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx +++ b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx @@ -102,7 +102,7 @@ const defaultTableObj = { "stream.kafka.broker.list": "", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.segment.rows": "0", "realtime.segment.flush.threshold.time": "24h", diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java similarity index 56% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java rename to pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java index dc4810d611..5c2bda51b8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java @@ -16,31 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.stream.kafka; +package org.apache.pinot.plugin.inputformat.json; import com.fasterxml.jackson.databind.JsonNode; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import java.net.URL; import java.util.HashMap; import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.JsonUtils; -import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; -public class KafkaJSONMessageDecoderTest { + +public class JSONMessageDecoderTest { @Test public void testJsonDecoderWithoutOutgoingTimeSpec() throws Exception { - Schema schema = Schema.fromFile(new File( - getClass().getClassLoader().getResource("data/test_sample_data_schema_without_outgoing_time_spec.json") - .getFile())); - Map<String, FieldSpec.DataType> sourceFields = new HashMap<>(); + Schema schema = loadSchema("data/test_sample_data_schema_without_outgoing_time_spec.json"); + Map<String, DataType> sourceFields = new HashMap<>(); for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType()); } @@ -50,66 +53,70 @@ public class KafkaJSONMessageDecoderTest { @Test public void testJsonDecoderWithOutgoingTimeSpec() throws Exception { - Schema schema = Schema.fromFile(new File( - getClass().getClassLoader().getResource("data/test_sample_data_schema_with_outgoing_time_spec.json") - .getFile())); - Map<String, FieldSpec.DataType> sourceFields = new HashMap<>(); + Schema schema = loadSchema("data/test_sample_data_schema_with_outgoing_time_spec.json"); + Map<String, DataType> sourceFields = new HashMap<>(); for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType()); } sourceFields.remove("secondsSinceEpoch"); - sourceFields.put("time_day", FieldSpec.DataType.INT); + sourceFields.put("time_day", DataType.INT); testJsonDecoder(sourceFields); } @Test public void testJsonDecoderNoTimeSpec() throws Exception { - Schema schema = Schema.fromFile( - new File(getClass().getClassLoader().getResource("data/test_sample_data_schema_no_time_field.json").getFile())); - Map<String, FieldSpec.DataType> sourceFields = new HashMap<>(); + Schema schema = loadSchema("data/test_sample_data_schema_no_time_field.json"); + Map<String, DataType> sourceFields = new HashMap<>(); for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType()); } testJsonDecoder(sourceFields); } - private void testJsonDecoder(Map<String, FieldSpec.DataType> sourceFields) + private Schema loadSchema(String resourcePath) + throws Exception { + URL resource = getClass().getClassLoader().getResource(resourcePath); + assertNotNull(resource); + return Schema.fromFile(new File(resource.getFile())); + } + + private void testJsonDecoder(Map<String, DataType> sourceFields) throws Exception { - try (BufferedReader reader = new BufferedReader( - new FileReader(getClass().getClassLoader().getResource("data/test_sample_data.json").getFile()))) { - KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder(); - decoder.init(new HashMap<>(), sourceFields.keySet(), "testTopic"); - GenericRow r = new GenericRow(); - String line = reader.readLine(); - while (line != null) { + URL resource = getClass().getClassLoader().getResource("data/test_sample_data.json"); + assertNotNull(resource); + try (BufferedReader reader = new BufferedReader(new FileReader(resource.getFile()))) { + JSONMessageDecoder decoder = new JSONMessageDecoder(); + decoder.init(Map.of(), sourceFields.keySet(), "testTopic"); + GenericRow row = new GenericRow(); + String line; + while ((line = reader.readLine()) != null) { JsonNode jsonNode = JsonUtils.DEFAULT_READER.readTree(line); - decoder.decode(line.getBytes(), r); + decoder.decode(line.getBytes(), row); for (String field : sourceFields.keySet()) { - Object actualValue = r.getValue(field); + Object actualValue = row.getValue(field); JsonNode expectedValue = jsonNode.get(field); switch (sourceFields.get(field)) { case STRING: - Assert.assertEquals(actualValue, expectedValue.asText()); + assertEquals(actualValue, expectedValue.asText()); break; case INT: - Assert.assertEquals(actualValue, expectedValue.asInt()); + assertEquals(actualValue, expectedValue.asInt()); break; case LONG: - Assert.assertEquals(actualValue, expectedValue.asLong()); + assertEquals(actualValue, expectedValue.asLong()); break; case FLOAT: - Assert.assertEquals(actualValue, (float) expectedValue.asDouble()); + assertEquals(actualValue, (float) expectedValue.asDouble()); break; case DOUBLE: - Assert.assertEquals(actualValue, expectedValue.asDouble()); + assertEquals(actualValue, expectedValue.asDouble()); break; default: - Assert.assertTrue(false, "Shouldn't arrive here."); + fail("Shouldn't arrive here."); break; } } - line = reader.readLine(); } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json similarity index 100% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json rename to pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json similarity index 100% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json rename to pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json similarity index 100% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json rename to pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json similarity index 100% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json rename to pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md index 9016804697..e32826700f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md @@ -30,15 +30,13 @@ A stream plugin for another version of kafka, or another stream, can be added in ``` * How to use Kafka 2.x connector -Below is a sample `streamConfigs` used to create a realtime table with Kafka Stream(High) level consumer: +Below is a sample `streamConfigs` used to create a real-time table with Kafka consumer: ```$xslt "streamConfigs": { "streamType": "kafka", + "stream.kafka.broker.list": "localhost:19092", "stream.kafka.topic.name": "meetupRSVPEvents", - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", - "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.zk.broker.url": "localhost:2191/kafka", - "stream.kafka.hlc.bootstrap.server": "localhost:19092" + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", } ``` diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java index cf12fcc9cc..d3d55572cd 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java @@ -22,8 +22,7 @@ import org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder; /** - * This class has been kept for backward compatability. Use @see `org.apache.pinot.plugin.inputformat.json - * .StreamJSONMessageDecoder` for future use cases. + * This class has been kept for backward compatibility. Use {@link JSONMessageDecoder} for future use cases. * This class will be removed in a later release. */ @Deprecated --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org