This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e544877e23 Support unnest on a string typed json column (#15663) e544877e23 is described below commit e544877e2317f63cc52614f1eb45be08b0fdcaec Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue Apr 29 00:54:50 2025 -0700 Support unnest on a string typed json column (#15663) --- .../common/function/scalar/JsonFunctions.java | 48 +++++++ .../pinot/common/function/JsonFunctionsTest.java | 19 +++ .../local/segment/creator/TransformPipeline.java | 14 ++- .../org/apache/pinot/tools/QuickStartBase.java | 1 + .../batch/testUnnest/ingestionJobSpec.yaml | 140 +++++++++++++++++++++ .../batch/testUnnest/rawdata/output.parquet | Bin 0 -> 1253 bytes .../testUnnest_offline_table_config.json | 61 +++++++++ .../batch/testUnnest/testUnnest_schema.json | 25 ++++ 8 files changed, 304 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java index 6e93b11e73..ee446b27c8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/JsonFunctions.java @@ -267,6 +267,54 @@ public class JsonFunctions { return result; } + @Nullable + @ScalarFunction + public static List jsonStringToArray(String jsonString) { + String json = jsonString.trim(); + try { + if (json.startsWith("[")) { + // JSON Array + return JsonUtils.stringToObject(json, List.class); + } + } catch (Exception e) { + // Ignore + } + return null; + } + + @Nullable + @ScalarFunction + public static Map jsonStringToMap(String jsonString) { + String json = jsonString.trim(); + try { + if (json.startsWith("{")) { + return JsonUtils.stringToObject(json, Map.class); + } + } catch (Exception e) { + // Ignore + } + return null; + } + + @Nullable + @ScalarFunction + public static Object jsonStringToListOrMap(String jsonString) { + String json = jsonString.trim(); + try { + if (json.startsWith("[")) { + // JSON Array + return JsonUtils.stringToObject(json, List.class); + } + if (json.startsWith("{")) { + // JSON Object + return JsonUtils.stringToObject(json, Map.class); + } + } catch (Exception e) { + // Ignore + } + return null; + } + private static void setValuesToMap(String keyColumnName, String valueColumnName, Object obj, Map<String, String> result) { if (obj instanceof Map) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java index c374cfbf1f..9e77293d2c 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/JsonFunctionsTest.java @@ -432,4 +432,23 @@ public class JsonFunctionsTest { ); assertEquals(JsonFunctions.jsonKeyValueArrayToMap(jsonList), expected); } + + @Test + public void testJsonStringToCollection() { + String jsonArrayString = "[{\"k1\":\"v1\"}, {\"k2\":\"v2\"}, {\"k3\":\"v3\"}, {\"k4\":\"v4\"}, {\"k5\":\"v5\"}]"; + List<Map<String, String>> expectedArray = + List.of(Map.of("k1", "v1"), Map.of("k2", "v2"), Map.of("k3", "v3"), Map.of("k4", "v4"), Map.of("k5", "v5")); + assertEquals(JsonFunctions.jsonStringToArray(jsonArrayString), expectedArray); + assertEquals(JsonFunctions.jsonStringToListOrMap(jsonArrayString), expectedArray); + + String jsonMapString = "{\"k1\":\"v1\", \"k2\":\"v2\", \"k3\":\"v3\", \"k4\":\"v4\",\"k5\":\"v5\"}"; + Map<String, String> expectedMap = + Map.of("k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4", "k5", "v5"); + assertEquals(JsonFunctions.jsonStringToMap(jsonMapString), expectedMap); + assertEquals(JsonFunctions.jsonStringToListOrMap(jsonMapString), expectedMap); + + String invalidJson = "[\"k1\":\"v1\"}"; + assertEquals(JsonFunctions.jsonStringToMap(invalidJson), null); + assertEquals(JsonFunctions.jsonStringToListOrMap(invalidJson), null); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java index c9d9abc316..13fc3aff55 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java @@ -80,13 +80,19 @@ public class TransformPipeline { } public Collection<String> getInputColumns() { - if (_complexTypeTransformer == null) { + if (_preComplexTypeTransformers == null && _complexTypeTransformer == null) { return _recordTransformer.getInputColumns(); - } else { - Set<String> inputColumns = new HashSet<>(_recordTransformer.getInputColumns()); + } + Set<String> inputColumns = new HashSet<>(_recordTransformer.getInputColumns()); + if (_preComplexTypeTransformers != null) { + for (RecordTransformer preComplexTypeTransformer : _preComplexTypeTransformers) { + inputColumns.addAll(preComplexTypeTransformer.getInputColumns()); + } + } + if (_complexTypeTransformer != null) { inputColumns.addAll(_complexTypeTransformer.getInputColumns()); - return inputColumns; } + return inputColumns; } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java index 48c4fdac0c..669763fd8f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java @@ -79,6 +79,7 @@ public abstract class QuickStartBase { "examples/batch/githubComplexTypeEvents", "examples/batch/billing", "examples/batch/fineFoodReviews", + "examples/batch/testUnnest", }; protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES = ImmutableMap.<String, String>builder() diff --git a/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml b/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml new file mode 100644 index 0000000000..5817f26cec --- /dev/null +++ b/pinot-tools/src/main/resources/examples/batch/testUnnest/ingestionJobSpec.yaml @@ -0,0 +1,140 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# executionFrameworkSpec: Defines ingestion jobs to be running. +executionFrameworkSpec: + + # name: execution framework name + name: 'standalone' + + # Class to use for segment generation and different push types. + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner' + segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner' + + +# jobType: Pinot ingestion job type. +# Supported job types are defined in PinotIngestionJobType class. +# 'SegmentCreation' +# 'SegmentTarPush' +# 'SegmentUriPush' +# 'SegmentMetadataPush' +# 'SegmentCreationAndTarPush' +# 'SegmentCreationAndUriPush' +# 'SegmentCreationAndMetadataPush' +jobType: SegmentCreationAndTarPush + +# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS. +inputDirURI: 'examples/batch/testUnnest/rawdata' + +# includeFileNamePattern: include file name pattern, supported glob pattern. +# Sample usage: +# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories; +# 'glob:**/*.avro' will include all the avro files under inputDirURI recursively. +includeFileNamePattern: 'glob:**/*.parquet' + +# excludeFileNamePattern: exclude file name pattern, supported glob pattern. +# Sample usage: +# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories; +# 'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively. +# _excludeFileNamePattern: '' + +# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS. +outputDirURI: 'examples/batch/testUnnest/segments' + +# overwriteOutput: Overwrite output segments if existed. +overwriteOutput: true + +# pinotFSSpecs: defines all related Pinot file systems. +pinotFSSpecs: + + - # scheme: used to identify a PinotFS. + # E.g. local, hdfs, dbfs, etc + scheme: file + + # className: Class name used to create the PinotFS instance. + # E.g. + # org.apache.pinot.spi.filesystem.LocalPinotFS is used for local filesystem + # org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data Lake + # org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS + className: org.apache.pinot.spi.filesystem.LocalPinotFS + +# recordReaderSpec: defines all record reader +recordReaderSpec: + + # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc. + dataFormat: 'parquet' + + # className: Corresponding RecordReader class name. + # E.g. + # org.apache.pinot.plugin.inputformat.avro.AvroRecordReader + # org.apache.pinot.plugin.inputformat.csv.CSVRecordReader + # org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader + # org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader + # org.apache.pinot.plugin.inputformat.json.JSONRecordReader + # org.apache.pinot.plugin.inputformat.orc.ORCRecordReader + # org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader + className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader' + + # configClassName: Corresponding RecordReaderConfig class name, it's mandatory for CSV and Thrift file format. + # E.g. + # org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig + # org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig + #configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig' + + # configs: Used to init RecordReaderConfig class name, this config is required for CSV and Thrift data format. + configs: + + +# tableSpec: defines table name and where to fetch corresponding table config and table schema. +tableSpec: + + # tableName: Table name + tableName: 'testUnnest' + + # schemaURI: defines where to read the table schema, supports PinotFS or HTTP. + # E.g. + # hdfs://path/to/table_schema.json + # http://localhost:9000/tables/myTable/schema + schemaURI: 'http://localhost:9000/tables/testUnnest/schema' + + # tableConfigURI: defines where to reade the table config. + # Supports using PinotFS or HTTP. + # E.g. + # hdfs://path/to/table_config.json + # http://localhost:9000/tables/myTable + # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper. + # The real table config is the object under the field 'OFFLINE'. + tableConfigURI: 'http://localhost:9000/tables/testUnnest' + +# pinotClusterSpecs: defines the Pinot Cluster Access Point. +pinotClusterSpecs: + - # controllerURI: used to fetch table/schema information and data push. + # E.g. http://localhost:9000 + controllerURI: 'http://localhost:9000' + +# pushJobSpec: defines segment push job related configuration. +pushJobSpec: + + # pushAttempts: number of attempts for push job, default is 1, which means no retry. + pushAttempts: 2 + + # pushRetryIntervalMillis: retry wait Ms, default to 1 second. + pushRetryIntervalMillis: 1000 diff --git a/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet b/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet new file mode 100644 index 0000000000..2c7c43799c Binary files /dev/null and b/pinot-tools/src/main/resources/examples/batch/testUnnest/rawdata/output.parquet differ diff --git a/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json new file mode 100644 index 0000000000..ef765fbb8c --- /dev/null +++ b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_offline_table_config.json @@ -0,0 +1,61 @@ +{ + "tableName": "testUnnest_OFFLINE", + "tableType": "OFFLINE", + "segmentsConfig": { + "deletedSegmentsRetentionPeriod": "0d", + "segmentPushType": "APPEND", + "timeColumnName": "event_time", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "180", + "minimizeDataMovement": false, + "replication": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "aggregateMetrics": false, + "optimizeDictionary": false, + "autoGeneratedInvertedIndex": false, + "enableDefaultStarTree": false, + "nullHandlingEnabled": true, + "skipSegmentPreprocess": false, + "optimizeDictionaryType": false, + "enableDynamicStarTreeCreation": false, + "columnMajorSegmentBuilderEnabled": true, + "createInvertedIndexDuringSegmentGeneration": true, + "optimizeDictionaryForMetrics": false, + "noDictionarySizeRatioThreshold": 0, + "loadMode": "MMAP", + "rangeIndexVersion": 2, + "invertedIndexColumns": [ + "key" + ], + "varLengthDictionaryColumns": [ + "key" + ] + }, + "metadata": {}, + "ingestionConfig": { + "transformConfigs": [], + "enrichmentConfigs": [ + { + "enricherType": "generateColumn", + "properties": {"fieldToFunctionMap":{"recordArray":"jsonStringToListOrMap(data_for_unnesting)"}}, + "preComplexTypeTransform": true + } + ], + "continueOnError": true, + "rowTimeValueCheck": true, + "complexTypeConfig": { + "fieldsToUnnest": [ + "recordArray" + ], + "delimiter": "||" + }, + "retryOnSegmentBuildPrecheckFailure": false, + "segmentTimeValueCheck": false + }, + "isDimTable": false +} diff --git a/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json new file mode 100644 index 0000000000..9e6163fe81 --- /dev/null +++ b/pinot-tools/src/main/resources/examples/batch/testUnnest/testUnnest_schema.json @@ -0,0 +1,25 @@ +{ + "schemaName": "testUnnest", + "enableColumnBasedNullHandling": true, + "dimensionFieldSpecs": [ + { + "name": "key", + "dataType": "STRING", + "fieldType": "DIMENSION" + }, + { + "name": "recordArray||name", + "dataType": "STRING", + "fieldType": "DIMENSION" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "event_time", + "dataType": "LONG", + "fieldType": "DATE_TIME", + "format": "EPOCH|MILLISECONDS|1", + "granularity": "MILLISECONDS|1" + } + ] +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org