This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e544877e23 Support unnest on a string typed json column (#15663)
e544877e23 is described below

commit e544877e2317f63cc52614f1eb45be08b0fdcaec
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Tue Apr 29 00:54:50 2025 -0700

    Support unnest on a string typed json column (#15663)
---
 .../common/function/scalar/JsonFunctions.java      |  48 +++++++
 .../pinot/common/function/JsonFunctionsTest.java   |  19 +++
 .../local/segment/creator/TransformPipeline.java   |  14 ++-
 .../org/apache/pinot/tools/QuickStartBase.java     |   1 +
 .../batch/testUnnest/ingestionJobSpec.yaml         | 140 +++++++++++++++++++++
 .../batch/testUnnest/rawdata/output.parquet        | Bin 0 -> 1253 bytes
 .../testUnnest_offline_table_config.json           |  61 +++++++++
 .../batch/testUnnest/testUnnest_schema.json        |  25 ++++
 8 files changed, 304 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java
index 6e93b11e73..ee446b27c8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java
@@ -267,6 +267,54 @@ public class JsonFunctions {
     return result;
   }
 
+  @Nullable
+  @ScalarFunction
+  public static List jsonStringToArray(String jsonString) {
+    String json = jsonString.trim();
+    try {
+      if (json.startsWith("[")) {
+        // JSON Array
+        return JsonUtils.stringToObject(json, List.class);
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    return null;
+  }
+
+  @Nullable
+  @ScalarFunction
+  public static Map jsonStringToMap(String jsonString) {
+    String json = jsonString.trim();
+    try {
+      if (json.startsWith("{")) {
+        return JsonUtils.stringToObject(json, Map.class);
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    return null;
+  }
+
+  @Nullable
+  @ScalarFunction
+  public static Object jsonStringToListOrMap(String jsonString) {
+    String json = jsonString.trim();
+    try {
+      if (json.startsWith("[")) {
+        // JSON Array
+        return JsonUtils.stringToObject(json, List.class);
+      }
+      if (json.startsWith("{")) {
+        // JSON Object
+        return JsonUtils.stringToObject(json, Map.class);
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    return null;
+  }
+
   private static void setValuesToMap(String keyColumnName, String 
valueColumnName, Object obj,
       Map<String, String> result) {
     if (obj instanceof Map) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java
index c374cfbf1f..9e77293d2c 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java
@@ -432,4 +432,23 @@ public class JsonFunctionsTest {
     );
     assertEquals(JsonFunctions.jsonKeyValueArrayToMap(jsonList), expected);
   }
+
+  @Test
+  public void testJsonStringToCollection() {
+    String jsonArrayString = "[{\"k1\":\"v1\"}, {\"k2\":\"v2\"}, 
{\"k3\":\"v3\"}, {\"k4\":\"v4\"}, {\"k5\":\"v5\"}]";
+    List<Map<String, String>> expectedArray =
+        List.of(Map.of("k1", "v1"), Map.of("k2", "v2"), Map.of("k3", "v3"), 
Map.of("k4", "v4"), Map.of("k5", "v5"));
+    assertEquals(JsonFunctions.jsonStringToArray(jsonArrayString), 
expectedArray);
+    assertEquals(JsonFunctions.jsonStringToListOrMap(jsonArrayString), 
expectedArray);
+
+    String jsonMapString = "{\"k1\":\"v1\", \"k2\":\"v2\", \"k3\":\"v3\", 
\"k4\":\"v4\",\"k5\":\"v5\"}";
+    Map<String, String> expectedMap =
+        Map.of("k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4", "k5", "v5");
+    assertEquals(JsonFunctions.jsonStringToMap(jsonMapString), expectedMap);
+    assertEquals(JsonFunctions.jsonStringToListOrMap(jsonMapString), 
expectedMap);
+
+    String invalidJson = "[\"k1\":\"v1\"}";
+    assertEquals(JsonFunctions.jsonStringToMap(invalidJson), null);
+    assertEquals(JsonFunctions.jsonStringToListOrMap(invalidJson), null);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index c9d9abc316..13fc3aff55 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -80,13 +80,19 @@ public class TransformPipeline {
   }
 
   public Collection<String> getInputColumns() {
-    if (_complexTypeTransformer == null) {
+    if (_preComplexTypeTransformers == null && _complexTypeTransformer == 
null) {
       return _recordTransformer.getInputColumns();
-    } else {
-      Set<String> inputColumns = new 
HashSet<>(_recordTransformer.getInputColumns());
+    }
+    Set<String> inputColumns = new 
HashSet<>(_recordTransformer.getInputColumns());
+    if (_preComplexTypeTransformers != null) {
+      for (RecordTransformer preComplexTypeTransformer : 
_preComplexTypeTransformers) {
+        inputColumns.addAll(preComplexTypeTransformer.getInputColumns());
+      }
+    }
+    if (_complexTypeTransformer != null) {
       inputColumns.addAll(_complexTypeTransformer.getInputColumns());
-      return inputColumns;
     }
+    return inputColumns;
   }
 
   /**
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 48c4fdac0c..669763fd8f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -79,6 +79,7 @@ public abstract class QuickStartBase {
       "examples/batch/githubComplexTypeEvents",
       "examples/batch/billing",
       "examples/batch/fineFoodReviews",
+      "examples/batch/testUnnest",
   };
 
   protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= ImmutableMap.<String, String>builder()
diff --git 
a/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml
 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml
new file mode 100644
index 0000000000..5817f26cec
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+  # name: execution framework name
+  name: 'standalone'
+
+  # Class to use for segment generation and different push types.
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+  segmentMetadataPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
+
+
+# jobType: Pinot ingestion job type.
+# Supported job types are defined in PinotIngestionJobType class.
+#   'SegmentCreation'
+#   'SegmentTarPush'
+#   'SegmentUriPush'
+#   'SegmentMetadataPush'
+#   'SegmentCreationAndTarPush'
+#   'SegmentCreationAndUriPush'
+#   'SegmentCreationAndMetadataPush'
+jobType: SegmentCreationAndTarPush
+
+# inputDirURI: Root directory of input data, expected to have scheme 
configured in PinotFS.
+inputDirURI: 'examples/batch/testUnnest/rawdata'
+
+# includeFileNamePattern: include file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.avro' will include all avro files just under the inputDirURI, not 
sub directories;
+#   'glob:**/*.avro' will include all the avro files under inputDirURI 
recursively.
+includeFileNamePattern: 'glob:**/*.parquet'
+
+# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.avro' will exclude all avro files just under the inputDirURI, not 
sub directories;
+#   'glob:**/*.avro' will exclude all the avro files under inputDirURI 
recursively.
+# _excludeFileNamePattern: ''
+
+# outputDirURI: Root directory of output segments, expected to have scheme 
configured in PinotFS.
+outputDirURI: 'examples/batch/testUnnest/segments'
+
+# overwriteOutput: Overwrite output segments if existed.
+overwriteOutput: true
+
+# pinotFSSpecs: defines all related Pinot file systems.
+pinotFSSpecs:
+
+  - # scheme: used to identify a PinotFS.
+    # E.g. local, hdfs, dbfs, etc
+    scheme: file
+
+    # className: Class name used to create the PinotFS instance.
+    # E.g.
+    #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local 
filesystem
+    #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data 
Lake
+    #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+
+# recordReaderSpec: defines all record reader
+recordReaderSpec:
+
+  # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 
'json', 'thrift' etc.
+  dataFormat: 'parquet'
+
+  # className: Corresponding RecordReader class name.
+  # E.g.
+  #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
+  #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
+  #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
+  #   org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader
+  #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
+  #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
+  #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
+  className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
+
+  # configClassName: Corresponding RecordReaderConfig class name, it's 
mandatory for CSV and Thrift file format.
+  # E.g.
+  #    org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig
+  #    org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig
+  #configClassName: 
'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
+
+  # configs: Used to init RecordReaderConfig class name, this config is 
required for CSV and Thrift data format.
+  configs:
+
+
+# tableSpec: defines table name and where to fetch corresponding table config 
and table schema.
+tableSpec:
+
+  # tableName: Table name
+  tableName: 'testUnnest'
+
+  # schemaURI: defines where to read the table schema, supports PinotFS or 
HTTP.
+  # E.g.
+  #   hdfs://path/to/table_schema.json
+  #   http://localhost:9000/tables/myTable/schema
+  schemaURI: 'http://localhost:9000/tables/testUnnest/schema'
+
+  # tableConfigURI: defines where to reade the table config.
+  # Supports using PinotFS or HTTP.
+  # E.g.
+  #   hdfs://path/to/table_config.json
+  #   http://localhost:9000/tables/myTable
+  # Note that the API to read Pinot table config directly from pinot 
controller contains a JSON wrapper.
+  # The real table config is the object under the field 'OFFLINE'.
+  tableConfigURI: 'http://localhost:9000/tables/testUnnest'
+
+# pinotClusterSpecs: defines the Pinot Cluster Access Point.
+pinotClusterSpecs:
+  - # controllerURI: used to fetch table/schema information and data push.
+    # E.g. http://localhost:9000
+    controllerURI: 'http://localhost:9000'
+
+# pushJobSpec: defines segment push job related configuration.
+pushJobSpec:
+
+  # pushAttempts: number of attempts for push job, default is 1, which means 
no retry.
+  pushAttempts: 2
+
+  # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+  pushRetryIntervalMillis: 1000
diff --git 
a/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet
 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet
new file mode 100644
index 0000000000..2c7c43799c
Binary files /dev/null and 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet
 differ
diff --git 
a/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json
new file mode 100644
index 0000000000..ef765fbb8c
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json
@@ -0,0 +1,61 @@
+{
+  "tableName": "testUnnest_OFFLINE",
+  "tableType": "OFFLINE",
+  "segmentsConfig": {
+    "deletedSegmentsRetentionPeriod": "0d",
+    "segmentPushType": "APPEND",
+    "timeColumnName": "event_time",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "180",
+    "minimizeDataMovement": false,
+    "replication": "1"
+  },
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  },
+  "tableIndexConfig": {
+    "aggregateMetrics": false,
+    "optimizeDictionary": false,
+    "autoGeneratedInvertedIndex": false,
+    "enableDefaultStarTree": false,
+    "nullHandlingEnabled": true,
+    "skipSegmentPreprocess": false,
+    "optimizeDictionaryType": false,
+    "enableDynamicStarTreeCreation": false,
+    "columnMajorSegmentBuilderEnabled": true,
+    "createInvertedIndexDuringSegmentGeneration": true,
+    "optimizeDictionaryForMetrics": false,
+    "noDictionarySizeRatioThreshold": 0,
+    "loadMode": "MMAP",
+    "rangeIndexVersion": 2,
+    "invertedIndexColumns": [
+      "key"
+    ],
+    "varLengthDictionaryColumns": [
+      "key"
+    ]
+  },
+  "metadata": {},
+  "ingestionConfig": {
+    "transformConfigs": [],
+    "enrichmentConfigs": [
+      {
+        "enricherType": "generateColumn",
+        "properties": 
{"fieldToFunctionMap":{"recordArray":"jsonStringToListOrMap(data_for_unnesting)"}},
+        "preComplexTypeTransform": true
+      }
+    ],
+    "continueOnError": true,
+    "rowTimeValueCheck": true,
+    "complexTypeConfig": {
+      "fieldsToUnnest": [
+        "recordArray"
+      ],
+      "delimiter": "||"
+    },
+    "retryOnSegmentBuildPrecheckFailure": false,
+    "segmentTimeValueCheck": false
+  },
+  "isDimTable": false
+}
diff --git 
a/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json
 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json
new file mode 100644
index 0000000000..9e6163fe81
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json
@@ -0,0 +1,25 @@
+{
+  "schemaName": "testUnnest",
+  "enableColumnBasedNullHandling": true,
+  "dimensionFieldSpecs": [
+    {
+      "name": "key",
+      "dataType": "STRING",
+      "fieldType": "DIMENSION"
+    },
+    {
+      "name": "recordArray||name",
+      "dataType": "STRING",
+      "fieldType": "DIMENSION"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "event_time",
+      "dataType": "LONG",
+      "fieldType": "DATE_TIME",
+      "format": "EPOCH|MILLISECONDS|1",
+      "granularity": "MILLISECONDS|1"
+    }
+  ]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to