This is an automated email from the ASF dual-hosted git repository. yupeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new e400566 Add the complex-type support to decoder/reader (#6945) e400566 is described below commit e40056621a6b6ffd2201717d3da5dcccdecd7257 Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Thu May 20 10:26:27 2021 -0700 Add the complex-type support to decoder/reader (#6945) * update decoder to support complex-type * comments * fix improts --- .../pinot/segment/local/utils/IngestionUtils.java | 22 ++++++++++ ...bEvents_offline_complexTypeHandling_schema.json | 50 +++++++++++++++------- ...s_offline_complexTypeHandling_table_config.json | 4 -- ...eHandling_meetupRsvp_realtime_table_config.json | 12 +----- .../complexTypeHandling_meetupRsvp_schema.json | 44 ++++++++++++------- 5 files changed, 87 insertions(+), 45 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index d1a5b14..06bc61e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; @@ -41,6 +42,7 @@ import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; import org.apache.pinot.spi.auth.AuthContext; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; @@ -298,10 +300,30 @@ public final class IngestionUtils { Set<String> fieldsForRecordExtractor = new HashSet<>(); extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor); extractFieldsFromSchema(schema, fieldsForRecordExtractor); + fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig); return fieldsForRecordExtractor; } /** + * Extracts the root-level names from the fields, to support the complex-type handling. For example, + * a field a.b.c will return the top-level name a. + */ + private static Set<String> getFieldsToReadWithComplexType(Set<String> fieldsToRead, IngestionConfig ingestionConfig) { + if (ingestionConfig == null || ingestionConfig.getComplexTypeConfig() == null) { + // do nothing + return fieldsToRead; + } + ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig(); + Set<String> result = new HashSet<>(); + String delimiter = complexTypeConfig.getDelimiter() == null ? ComplexTypeTransformer.DEFAULT_DELIMITER + : complexTypeConfig.getDelimiter(); + for (String field : fieldsToRead) { + result.add(StringUtils.split(field, delimiter)[0]); + } + return result; + } + + /** * Extracts all the fields needed by the {@link org.apache.pinot.spi.data.readers.RecordExtractor} from the given Schema * TODO: for now, we assume that arguments to transform function are in the source i.e. no columns are derived from transformed columns */ diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json index 30de6b5..dffd623 100644 --- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json +++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json @@ -9,21 +9,41 @@ "dataType": "STRING" }, { - "name": "payload.commits.sha", - "dataType": "STRING" - }, - { - "name": "payload.commits.author.name", - "dataType": "STRING" - }, - { - "name": "payload.commits.author.email", - "dataType": "STRING" - }, - { - "name": "payload_json", - "dataType": "STRING", - "maxLength": 2147483647 + "name" : "payload.push_id", + "dataType" : "LONG" + }, { + "name" : "payload.size", + "dataType" : "INT" + }, { + "name" : "payload.distinct_size", + "dataType" : "INT" + }, { + "name" : "payload.ref", + "dataType" : "STRING" + }, { + "name" : "payload.head", + "dataType" : "STRING" + }, { + "name" : "payload.before", + "dataType" : "STRING" + }, { + "name" : "payload.commits.sha", + "dataType" : "STRING" + }, { + "name" : "payload.commits.author.name", + "dataType" : "STRING" + }, { + "name" : "payload.commits.author.email", + "dataType" : "STRING" + }, { + "name" : "payload.commits.message", + "dataType" : "STRING" + }, { + "name" : "payload.commits.distinct", + "dataType" : "BOOLEAN" + }, { + "name" : "payload.commits.url", + "dataType" : "STRING" } ], "dateTimeFieldSpecs": [ diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json index de496de..c72cf3c 100644 --- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json +++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json @@ -16,10 +16,6 @@ { "columnName": "created_at_timestamp", "transformFunction": "fromDateTime(created_at, 'yyyy-MM-dd''T''HH:mm:ss''Z''')" - }, - { - "columnName": "payload_json", - "transformFunction": "jsonFormat(\"payload\")" } ], "complexTypeConfig": { diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json index ce85aea..6b787a0 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_realtime_table_config.json @@ -26,23 +26,13 @@ ] }, "transformConfigs": [ - { - "columnName": "group_json", - "transformFunction": "jsonFormat(\"group\")" - } ], "complexTypeConfig": { "unnestFields": ["group.group_topics"] } }, "tableIndexConfig": { - "loadMode": "MMAP", - "noDictionaryColumns": [ - "group_json" - ], - "jsonIndexColumns": [ - "group_json" - ] + "loadMode": "MMAP" }, "metadata": { "customConfigs": {} diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json index 825e963..8974ccf 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/complexTypeHandling_meetupRsvp_schema.json @@ -2,21 +2,35 @@ "schemaName": "meetupRsvp", "dimensionFieldSpecs": [ { - "name": "group_json", - "dataType": "STRING", - "maxLength": 2147483647 - }, - { - "name": "group.group_topics.urlkey", - "dataType": "STRING" - }, - { - "name": "group.group_topics.topic_name", - "dataType": "STRING" - }, - { - "name": "group.group_id", - "dataType": "LONG" + "name" : "group.group_topics.urlkey", + "dataType" : "STRING" + }, { + "name" : "group.group_topics.topic_name", + "dataType" : "STRING" + }, { + "name" : "group.group_city", + "dataType" : "STRING" + }, { + "name" : "group.group_country", + "dataType" : "STRING" + }, { + "name" : "group.group_id", + "dataType" : "INT" + }, { + "name" : "group.group_name", + "dataType" : "STRING" + }, { + "name" : "group.group_lon", + "dataType" : "DOUBLE" + }, { + "name" : "group.group_urlname", + "dataType" : "STRING" + }, { + "name" : "group.group_state", + "dataType" : "STRING" + }, { + "name" : "group.group_lat", + "dataType" : "DOUBLE" }, { "name": "rsvp_id", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org