This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 430d48e  The last of the getTimeFieldSpec calls (#5378)
430d48e is described below

commit 430d48e3fb97ad66905b9fad117fd7de3ad2952e
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Fri May 15 15:53:02 2020 -0700

    The last of the getTimeFieldSpec calls (#5378)
    
    Removing the last of the getTimeFieldSpec calls.
    1. SegmentGeneratorConfig now uses the new getSpecForTimeColumn() call, 
which returns a DateTimeFieldSpec. Time column is expected in table config if 
it is to be considered when creating segment. (this also sets the stage for 
allowing a DateTimeFieldSpec to be a primary time column)
    2. Removing TimeFieldSpec special handling from 
RealtimeSegmentConverter.getUpdatedSchema. There is no need to remove the 
incoming time spec. Neither the recordReader nor the dataSource require the 
updated schema. Plus, the record transformer is a pass through.
    3. Removing special casing for TimeFieldSpec in 
Schema.isBackwardCompatible() method. The for loop for all specs includes time
---
 .../generator/SegmentGeneratorConfig.java          | 50 ++++++----------------
 .../minion/rollup/MergeRollupSegmentConverter.java |  1 -
 .../converter/RealtimeSegmentConverter.java        | 17 +-------
 .../generator/SegmentGeneratorConfigTest.java      | 16 ++-----
 .../index/loader/SegmentPreProcessorTest.java      |  2 +-
 .../converter/RealtimeSegmentConverterTest.java    | 17 --------
 .../java/org/apache/pinot/spi/data/Schema.java     | 13 +++---
 .../org/apache/pinot/tools/HybridQuickstart.java   | 14 +++---
 .../pinot/tools/streams/AirlineDataStream.java     | 12 +++---
 9 files changed, 43 insertions(+), 99 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 7126ca4..59531fe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
@@ -42,6 +43,8 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
 import org.apache.pinot.spi.data.Schema;
@@ -118,10 +121,6 @@ public class SegmentGeneratorConfig {
     setSchema(schema);
 
     // NOTE: SegmentGeneratorConfig#setSchema doesn't set the time column 
anymore. timeColumnName is expected to be read from table config.
-    //  If time column name is not set in table config, read time from schema.
-    // WARN: Once we move to DateTimeFieldSpec - table config has to be 
provided with valid time - if time needs to be set.
-    //  We cannot deduce whether 1) one of the provided DateTimes should be 
used as time column 2) if yes, which one
-    //  Even if only 1 DateTime exists, we cannot determine whether it should 
be primary time column (there could be no time column for table (REFRESH), but 
still multiple DateTimeFieldSpec)
     String timeColumnName = null;
     if (tableConfig.getValidationConfig() != null) {
       timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
@@ -182,42 +181,21 @@ public class SegmentGeneratorConfig {
   }
 
   /**
-   * Set time column details using the given time column. If not found, use 
schema
+   * Set time column details using the given time column
    */
-  public void setTime(String timeColumnName, Schema schema) {
+  public void setTime(@Nullable String timeColumnName, Schema schema) {
     if (timeColumnName != null) {
-      FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-      if (fieldSpec != null) {
-        setTime(fieldSpec);
-        return;
+      DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+      if (dateTimeFieldSpec != null) {
+        setTimeColumnName(dateTimeFieldSpec.getName());
+        DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+        if 
(formatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) {
+          setSegmentTimeUnit(formatSpec.getColumnUnit());
+        } else {
+          setSimpleDateFormat(formatSpec.getSDFPattern());
+        }
       }
     }
-    setTime(schema.getTimeFieldSpec());
-  }
-
-  /**
-   * Set time column details using the given field spec
-   */
-  private void setTime(FieldSpec timeSpec) {
-    if (timeSpec == null) {
-      return;
-    }
-    TimeFieldSpec timeFieldSpec = (TimeFieldSpec) timeSpec;
-    setTimeColumnName(timeFieldSpec.getName());
-
-    TimeGranularitySpec timeGranularitySpec = 
timeFieldSpec.getOutgoingGranularitySpec();
-
-    String timeFormat = timeGranularitySpec.getTimeFormat();
-    if (timeFormat.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
-      // Time format: 'EPOCH'
-      setSegmentTimeUnit(timeGranularitySpec.getTimeType());
-    } else {
-      // Time format: 'SIMPLE_DATE_FORMAT:<pattern>'
-      
Preconditions.checkArgument(timeFormat.startsWith(TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString()),
-          "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'", 
timeFormat,
-          TimeGranularitySpec.TimeFormat.EPOCH, 
TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT);
-      setSimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1));
-    }
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
index 3b7d089..9e79070 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
@@ -112,7 +112,6 @@ public class MergeRollupSegmentConverter {
     for (DateTimeFieldSpec dateTimeFieldSpec : schema.getDateTimeFieldSpecs()) 
{
       groupByColumns.add(dateTimeFieldSpec.getName());
     }
-    // TODO: once time column starts showing up as dateTimeFieldSpec 
(https://github.com/apache/incubator-pinot/issues/2756) below lines becomes 
redundant
     String timeColumnName = 
_tableConfig.getValidationConfig().getTimeColumnName();
     if (timeColumnName != null && !groupByColumns.contains(timeColumnName)) {
       groupByColumns.add(timeColumnName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 48c63a4..f8e5098 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -150,26 +150,13 @@ public class RealtimeSegmentConverter {
   }
 
   /**
-   * Returns a new schema based on the original one. The new schema removes 
columns as needed (for ex, virtual cols)
-   * and adds the new timespec to the schema.
+   * Returns a new schema containing only physical columns
    */
   @VisibleForTesting
   public Schema getUpdatedSchema(Schema original) {
     Schema newSchema = new Schema();
-    TimeFieldSpec tfs = original.getTimeFieldSpec();
-    if (tfs != null) {
-      // Use outgoing granularity for creating segment
-      TimeGranularitySpec outgoing = tfs.getOutgoingGranularitySpec();
-      if (outgoing != null) {
-        TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
-        newSchema.addField(newTimeSpec);
-      }
-    }
-
     for (String col : original.getPhysicalColumnNames()) {
-      if ((tfs == null) || (!col.equals(tfs.getName()))) {
-        newSchema.addField(original.getFieldSpecFor(col));
-      }
+      newSchema.addField(original.getFieldSpecFor(col));
     }
     return newSchema;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
index 6c19432..05e8a7d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
@@ -40,22 +40,18 @@ public class SegmentGeneratorConfigTest {
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("daysSinceEpoch").build();
-    // table config provided
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch");
     assertEquals(segmentGeneratorConfig.getTimeColumnType(), 
SegmentGeneratorConfig.TimeColumnType.EPOCH);
     assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS);
     assertNull(segmentGeneratorConfig.getSimpleDateFormat());
 
-    // table config not provided
-    // NOTE: this behavior will not hold true when we move to 
dateTimeFieldSpec.
     // MUST provide valid tableConfig with time column if time details are 
wanted
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
     segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
-    assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch");
-    assertEquals(segmentGeneratorConfig.getTimeColumnType(), 
SegmentGeneratorConfig.TimeColumnType.EPOCH);
-    assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS);
+    assertNull(segmentGeneratorConfig.getTimeColumnName());
+    assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
     assertNull(segmentGeneratorConfig.getSimpleDateFormat());
   }
 
@@ -66,22 +62,18 @@ public class SegmentGeneratorConfigTest {
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("Date").build();
 
-    // Table config provided
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date");
     assertEquals(segmentGeneratorConfig.getTimeColumnType(), 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE);
     assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
     assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
 
-    // Table config not provided
-    // NOTE: this behavior will not hold true when we move to 
dateTimeFieldSpec.
     // MUST provide valid tableConfig with time column if time details are 
wanted
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
     segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
-    assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date");
-    assertEquals(segmentGeneratorConfig.getTimeColumnType(), 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE);
+    assertNull(segmentGeneratorConfig.getTimeColumnName());
     assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
-    assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
+    assertNull(segmentGeneratorConfig.getSimpleDateFormat());
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
index edc9fbe..3da79bb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
@@ -118,7 +118,7 @@ public class SegmentPreProcessorTest {
     Assert.assertNotNull(resourceUrl);
     _schema = Schema.fromFile(new File(resourceUrl.getFile()));
     _tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daySinceEpoch").build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
     resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA1);
     Assert.assertNotNull(resourceUrl);
     _newColumnsSchema1 = Schema.fromFile(new File(resourceUrl.getFile()));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index c09a3d6..d37f32c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -46,28 +46,11 @@ public class RealtimeSegmentConverterTest {
     String segmentName = "segment1";
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
     Assert.assertEquals(schema.getColumnNames().size(), 5);
-    
Assert.assertEquals(schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(),
 TimeUnit.MILLISECONDS);
 
     RealtimeSegmentConverter converter =
         new RealtimeSegmentConverter(null, "", schema, "testTable", 
tableConfig, segmentName, "col1");
 
     Schema newSchema = converter.getUpdatedSchema(schema);
     Assert.assertEquals(newSchema.getColumnNames().size(), 2);
-    
Assert.assertEquals(newSchema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(),
 TimeUnit.DAYS);
-  }
-
-  @Test
-  public void testNoTimeColumnsInSchema() {
-    Schema schema = new Schema();
-    schema.addField(new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, 
true));
-    schema.addField(new DimensionFieldSpec("col2", FieldSpec.DataType.STRING, 
true));
-    schema.addField(new DimensionFieldSpec("col3", FieldSpec.DataType.STRING, 
true));
-    schema.addField(new MetricFieldSpec("met1", FieldSpec.DataType.DOUBLE, 0));
-    schema.addField(new MetricFieldSpec("met2", FieldSpec.DataType.LONG, 0));
-    Assert.assertEquals(schema.getColumnNames().size(), 5);
-    RealtimeSegmentConverter converter =
-        new RealtimeSegmentConverter(null, "", schema, "testTable", null, 
"segment1", "col1");
-    Schema newSchema = converter.getUpdatedSchema(schema);
-    Assert.assertEquals(newSchema.getColumnNames().size(), 5);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index a3a2b64..0659839 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -578,19 +578,18 @@ public final class Schema {
    * Backward compatibility requires all columns and fieldSpec in oldSchema 
should be retained.
    *
    * @param oldSchema old schema
-   * @return
    */
 
   public boolean isBackwardCompatibleWith(Schema oldSchema) {
-    if (!EqualityUtils.isEqual(_timeFieldSpec, oldSchema.getTimeFieldSpec()) 
|| !EqualityUtils
-        .isEqual(_dateTimeFieldSpecs, oldSchema.getDateTimeFieldSpecs())) {
-      return false;
-    }
+    Set<String> columnNames = getColumnNames();
     for (Map.Entry<String, FieldSpec> entry : 
oldSchema.getFieldSpecMap().entrySet()) {
-      if (!getColumnNames().contains(entry.getKey())) {
+      String oldSchemaColumnName = entry.getKey();
+      if (!columnNames.contains(oldSchemaColumnName)) {
         return false;
       }
-      if (!getFieldSpecFor(entry.getKey()).equals(entry.getValue())) {
+      FieldSpec oldSchemaFieldSpec = entry.getValue();
+      FieldSpec fieldSpec = getFieldSpecFor(oldSchemaColumnName);
+      if (!fieldSpec.equals(oldSchemaFieldSpec)) {
         return false;
       }
     }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 1b0adcc..ac87795 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -25,11 +25,13 @@ import java.io.IOException;
 import java.net.URL;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -45,6 +47,7 @@ public class HybridQuickstart {
   private StreamDataServerStartable _kafkaStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private File _schemaFile;
+  private File _realtimeTableConfigFile;
   private File _dataFile;
   private File _ingestionJobSpecFile;
 
@@ -92,18 +95,18 @@ public class HybridQuickstart {
     }
 
     _dataFile = new File(_realtimeQuickStartDataDir, "airlineStats_data.avro");
-    File tableConfigFile = new File(_realtimeQuickStartDataDir, 
"airlineStats_realtime_table_config.json");
+    _realtimeTableConfigFile = new File(_realtimeQuickStartDataDir, 
"airlineStats_realtime_table_config.json");
 
     URL resource = Quickstart.class.getClassLoader().getResource(
         
"examples/stream/airlineStats/airlineStats_realtime_table_config.json");
     Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, tableConfigFile);
+    FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
     resource = Quickstart.class.getClassLoader().getResource(
         "examples/stream/airlineStats/sample_data/airlineStats_data.avro");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, _dataFile);
 
-    return new QuickstartTableRequest("airlineStats", _schemaFile, 
tableConfigFile);
+    return new QuickstartTableRequest("airlineStats", _schemaFile, 
_realtimeTableConfigFile);
   }
 
   private void startKafka() {
@@ -136,8 +139,9 @@ public class HybridQuickstart {
     runner.launchDataIngestionJob();
 
     printStatus(Color.YELLOW, "***** Starting airline data stream and 
publishing to Kafka *****");
-
-    final AirlineDataStream stream = new 
AirlineDataStream(Schema.fromFile(_schemaFile), _dataFile);
+    Schema schema = Schema.fromFile(_schemaFile);
+    TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile, 
TableConfig.class);
+    final AirlineDataStream stream = new AirlineDataStream(schema, 
tableConfig, _dataFile);
     stream.run();
 
     printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is 
complete *****");
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 897ca00..97b670b 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -30,6 +30,8 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -48,6 +50,7 @@ public class AirlineDataStream {
   private static final Logger logger = 
LoggerFactory.getLogger(AirlineDataStream.class);
 
   Schema pinotSchema;
+  String timeColumnName;
   File avroFile;
   DataFileStream<GenericRecord> avroDataStream;
   Integer currentTimeValue = 16102;
@@ -56,9 +59,10 @@ public class AirlineDataStream {
   int counter = 0;
   private StreamDataProducer producer;
 
-  public AirlineDataStream(Schema pinotSchema, File avroFile)
+  public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File 
avroFile)
       throws Exception {
     this.pinotSchema = pinotSchema;
+    this.timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
     this.avroFile = avroFile;
     createStream();
     Properties properties = new Properties();
@@ -120,13 +124,11 @@ public class AirlineDataStream {
               message.put(spec.getName(), record.get(spec.getName()));
             }
 
-            for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
+            for (FieldSpec spec : pinotSchema.getMetricFieldSpecs()) {
               message.put(spec.getName(), record.get(spec.getName()));
             }
 
-            TimeFieldSpec spec = pinotSchema.getTimeFieldSpec();
-            String timeColumn = spec.getIncomingGranularitySpec().getName();
-            message.put(timeColumn, currentTimeValue);
+            message.put(timeColumnName, currentTimeValue);
 
             try {
               publish(message);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to