This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 430d48e The last of the getTimeFieldSpec calls (#5378) 430d48e is described below commit 430d48e3fb97ad66905b9fad117fd7de3ad2952e Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Fri May 15 15:53:02 2020 -0700 The last of the getTimeFieldSpec calls (#5378) Removing the last of the getTimeFieldSpec calls. 1. SegmentGeneratorConfig now uses the new getSpecForTimeColumn() call, which returns a DateTimeFieldSpec. Time column is expected in table config if it is to be considered when creating segment. (this also sets the stage for allowing a DateTimeFieldSpec to be a primary time column) 2. Removing TimeFieldSpec special handling from RealtimeSegmentConverter.getUpdatedSchema. There is no need to remove the incoming time spec. Neither the recordReader nor the dataSource require the updated schema. Plus, the record transformer is a pass through. 3. Removing special casing for TimeFieldSpec in Schema.isBackwardCompatible() method. The for loop for all specs includes time --- .../generator/SegmentGeneratorConfig.java | 50 ++++++---------------- .../minion/rollup/MergeRollupSegmentConverter.java | 1 - .../converter/RealtimeSegmentConverter.java | 17 +------- .../generator/SegmentGeneratorConfigTest.java | 16 ++----- .../index/loader/SegmentPreProcessorTest.java | 2 +- .../converter/RealtimeSegmentConverterTest.java | 17 -------- .../java/org/apache/pinot/spi/data/Schema.java | 13 +++--- .../org/apache/pinot/tools/HybridQuickstart.java | 14 +++--- .../pinot/tools/streams/AirlineDataStream.java | 12 +++--- 9 files changed, 43 insertions(+), 99 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java index 7126ca4..59531fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.pinot.core.io.compression.ChunkCompressorFactory; import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator; @@ -42,6 +43,8 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.FieldType; import org.apache.pinot.spi.data.Schema; @@ -118,10 +121,6 @@ public class SegmentGeneratorConfig { setSchema(schema); // NOTE: SegmentGeneratorConfig#setSchema doesn't set the time column anymore. timeColumnName is expected to be read from table config. - // If time column name is not set in table config, read time from schema. - // WARN: Once we move to DateTimeFieldSpec - table config has to be provided with valid time - if time needs to be set. - // We cannot deduce whether 1) one of the provided DateTimes should be used as time column 2) if yes, which one - // Even if only 1 DateTime exists, we cannot determine whether it should be primary time column (there could be no time column for table (REFRESH), but still multiple DateTimeFieldSpec) String timeColumnName = null; if (tableConfig.getValidationConfig() != null) { timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); @@ -182,42 +181,21 @@ public class SegmentGeneratorConfig { } /** - * Set time column details using the given time column. If not found, use schema + * Set time column details using the given time column */ - public void setTime(String timeColumnName, Schema schema) { + public void setTime(@Nullable String timeColumnName, Schema schema) { if (timeColumnName != null) { - FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName); - if (fieldSpec != null) { - setTime(fieldSpec); - return; + DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + setTimeColumnName(dateTimeFieldSpec.getName()); + DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); + if (formatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) { + setSegmentTimeUnit(formatSpec.getColumnUnit()); + } else { + setSimpleDateFormat(formatSpec.getSDFPattern()); + } } } - setTime(schema.getTimeFieldSpec()); - } - - /** - * Set time column details using the given field spec - */ - private void setTime(FieldSpec timeSpec) { - if (timeSpec == null) { - return; - } - TimeFieldSpec timeFieldSpec = (TimeFieldSpec) timeSpec; - setTimeColumnName(timeFieldSpec.getName()); - - TimeGranularitySpec timeGranularitySpec = timeFieldSpec.getOutgoingGranularitySpec(); - - String timeFormat = timeGranularitySpec.getTimeFormat(); - if (timeFormat.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) { - // Time format: 'EPOCH' - setSegmentTimeUnit(timeGranularitySpec.getTimeType()); - } else { - // Time format: 'SIMPLE_DATE_FORMAT:<pattern>' - Preconditions.checkArgument(timeFormat.startsWith(TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString()), - "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'", timeFormat, - TimeGranularitySpec.TimeFormat.EPOCH, TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT); - setSimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1)); - } } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java index 3b7d089..9e79070 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java @@ -112,7 +112,6 @@ public class MergeRollupSegmentConverter { for (DateTimeFieldSpec dateTimeFieldSpec : schema.getDateTimeFieldSpecs()) { groupByColumns.add(dateTimeFieldSpec.getName()); } - // TODO: once time column starts showing up as dateTimeFieldSpec (https://github.com/apache/incubator-pinot/issues/2756) below lines becomes redundant String timeColumnName = _tableConfig.getValidationConfig().getTimeColumnName(); if (timeColumnName != null && !groupByColumns.contains(timeColumnName)) { groupByColumns.add(timeColumnName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java index 48c63a4..f8e5098 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java @@ -150,26 +150,13 @@ public class RealtimeSegmentConverter { } /** - * Returns a new schema based on the original one. The new schema removes columns as needed (for ex, virtual cols) - * and adds the new timespec to the schema. + * Returns a new schema containing only physical columns */ @VisibleForTesting public Schema getUpdatedSchema(Schema original) { Schema newSchema = new Schema(); - TimeFieldSpec tfs = original.getTimeFieldSpec(); - if (tfs != null) { - // Use outgoing granularity for creating segment - TimeGranularitySpec outgoing = tfs.getOutgoingGranularitySpec(); - if (outgoing != null) { - TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing); - newSchema.addField(newTimeSpec); - } - } - for (String col : original.getPhysicalColumnNames()) { - if ((tfs == null) || (!col.equals(tfs.getName()))) { - newSchema.addField(original.getFieldSpecFor(col)); - } + newSchema.addField(original.getFieldSpecFor(col)); } return newSchema; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java index 6c19432..05e8a7d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java @@ -40,22 +40,18 @@ public class SegmentGeneratorConfigTest { .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build(); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("daysSinceEpoch").build(); - // table config provided SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch"); assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.EPOCH); assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS); assertNull(segmentGeneratorConfig.getSimpleDateFormat()); - // table config not provided - // NOTE: this behavior will not hold true when we move to dateTimeFieldSpec. // MUST provide valid tableConfig with time column if time details are wanted tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); - assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch"); - assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.EPOCH); - assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS); + assertNull(segmentGeneratorConfig.getTimeColumnName()); + assertNull(segmentGeneratorConfig.getSegmentTimeUnit()); assertNull(segmentGeneratorConfig.getSimpleDateFormat()); } @@ -66,22 +62,18 @@ public class SegmentGeneratorConfigTest { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("Date").build(); - // Table config provided SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date"); assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE); assertNull(segmentGeneratorConfig.getSegmentTimeUnit()); assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd"); - // Table config not provided - // NOTE: this behavior will not hold true when we move to dateTimeFieldSpec. // MUST provide valid tableConfig with time column if time details are wanted tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); - assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date"); - assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE); + assertNull(segmentGeneratorConfig.getTimeColumnName()); assertNull(segmentGeneratorConfig.getSegmentTimeUnit()); - assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd"); + assertNull(segmentGeneratorConfig.getSimpleDateFormat()); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java index edc9fbe..3da79bb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java @@ -118,7 +118,7 @@ public class SegmentPreProcessorTest { Assert.assertNotNull(resourceUrl); _schema = Schema.fromFile(new File(resourceUrl.getFile())); _tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daySinceEpoch").build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build(); resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA1); Assert.assertNotNull(resourceUrl); _newColumnsSchema1 = Schema.fromFile(new File(resourceUrl.getFile())); diff --git a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java index c09a3d6..d37f32c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java @@ -46,28 +46,11 @@ public class RealtimeSegmentConverterTest { String segmentName = "segment1"; VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, segmentName); Assert.assertEquals(schema.getColumnNames().size(), 5); - Assert.assertEquals(schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.MILLISECONDS); RealtimeSegmentConverter converter = new RealtimeSegmentConverter(null, "", schema, "testTable", tableConfig, segmentName, "col1"); Schema newSchema = converter.getUpdatedSchema(schema); Assert.assertEquals(newSchema.getColumnNames().size(), 2); - Assert.assertEquals(newSchema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.DAYS); - } - - @Test - public void testNoTimeColumnsInSchema() { - Schema schema = new Schema(); - schema.addField(new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, true)); - schema.addField(new DimensionFieldSpec("col2", FieldSpec.DataType.STRING, true)); - schema.addField(new DimensionFieldSpec("col3", FieldSpec.DataType.STRING, true)); - schema.addField(new MetricFieldSpec("met1", FieldSpec.DataType.DOUBLE, 0)); - schema.addField(new MetricFieldSpec("met2", FieldSpec.DataType.LONG, 0)); - Assert.assertEquals(schema.getColumnNames().size(), 5); - RealtimeSegmentConverter converter = - new RealtimeSegmentConverter(null, "", schema, "testTable", null, "segment1", "col1"); - Schema newSchema = converter.getUpdatedSchema(schema); - Assert.assertEquals(newSchema.getColumnNames().size(), 5); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java index a3a2b64..0659839 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java @@ -578,19 +578,18 @@ public final class Schema { * Backward compatibility requires all columns and fieldSpec in oldSchema should be retained. * * @param oldSchema old schema - * @return */ public boolean isBackwardCompatibleWith(Schema oldSchema) { - if (!EqualityUtils.isEqual(_timeFieldSpec, oldSchema.getTimeFieldSpec()) || !EqualityUtils - .isEqual(_dateTimeFieldSpecs, oldSchema.getDateTimeFieldSpecs())) { - return false; - } + Set<String> columnNames = getColumnNames(); for (Map.Entry<String, FieldSpec> entry : oldSchema.getFieldSpecMap().entrySet()) { - if (!getColumnNames().contains(entry.getKey())) { + String oldSchemaColumnName = entry.getKey(); + if (!columnNames.contains(oldSchemaColumnName)) { return false; } - if (!getFieldSpecFor(entry.getKey()).equals(entry.getValue())) { + FieldSpec oldSchemaFieldSpec = entry.getValue(); + FieldSpec fieldSpec = getFieldSpecFor(oldSchemaColumnName); + if (!fieldSpec.equals(oldSchemaFieldSpec)) { return false; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index 1b0adcc..ac87795 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -25,11 +25,13 @@ import java.io.IOException; import java.net.URL; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.stream.StreamDataServerStartable; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.AirlineDataStream; @@ -45,6 +47,7 @@ public class HybridQuickstart { private StreamDataServerStartable _kafkaStarter; private ZkStarter.ZookeeperInstance _zookeeperInstance; private File _schemaFile; + private File _realtimeTableConfigFile; private File _dataFile; private File _ingestionJobSpecFile; @@ -92,18 +95,18 @@ public class HybridQuickstart { } _dataFile = new File(_realtimeQuickStartDataDir, "airlineStats_data.avro"); - File tableConfigFile = new File(_realtimeQuickStartDataDir, "airlineStats_realtime_table_config.json"); + _realtimeTableConfigFile = new File(_realtimeQuickStartDataDir, "airlineStats_realtime_table_config.json"); URL resource = Quickstart.class.getClassLoader().getResource( "examples/stream/airlineStats/airlineStats_realtime_table_config.json"); Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, tableConfigFile); + FileUtils.copyURLToFile(resource, _realtimeTableConfigFile); resource = Quickstart.class.getClassLoader().getResource( "examples/stream/airlineStats/sample_data/airlineStats_data.avro"); Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, _dataFile); - return new QuickstartTableRequest("airlineStats", _schemaFile, tableConfigFile); + return new QuickstartTableRequest("airlineStats", _schemaFile, _realtimeTableConfigFile); } private void startKafka() { @@ -136,8 +139,9 @@ public class HybridQuickstart { runner.launchDataIngestionJob(); printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****"); - - final AirlineDataStream stream = new AirlineDataStream(Schema.fromFile(_schemaFile), _dataFile); + Schema schema = Schema.fromFile(_schemaFile); + TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, TableConfig.class); + final AirlineDataStream stream = new AirlineDataStream(schema, tableConfig, _dataFile); stream.run(); printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is complete *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java index 897ca00..97b670b 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java @@ -30,6 +30,8 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; @@ -48,6 +50,7 @@ public class AirlineDataStream { private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class); Schema pinotSchema; + String timeColumnName; File avroFile; DataFileStream<GenericRecord> avroDataStream; Integer currentTimeValue = 16102; @@ -56,9 +59,10 @@ public class AirlineDataStream { int counter = 0; private StreamDataProducer producer; - public AirlineDataStream(Schema pinotSchema, File avroFile) + public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile) throws Exception { this.pinotSchema = pinotSchema; + this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); this.avroFile = avroFile; createStream(); Properties properties = new Properties(); @@ -120,13 +124,11 @@ public class AirlineDataStream { message.put(spec.getName(), record.get(spec.getName())); } - for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) { + for (FieldSpec spec : pinotSchema.getMetricFieldSpecs()) { message.put(spec.getName(), record.get(spec.getName())); } - TimeFieldSpec spec = pinotSchema.getTimeFieldSpec(); - String timeColumn = spec.getIncomingGranularitySpec().getName(); - message.put(timeColumn, currentTimeValue); + message.put(timeColumnName, currentTimeValue); try { publish(message); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org